Skip to content

Commit 26cf448

Browse files
committed
api: Add GET /telemetry/stats endpoint for aggregated statistics
Add aggregation endpoint that groups telemetry events by configurable fields (runtime, device_type, job_name, tree, branch, arch, kind, error_type) and returns pass/fail/incomplete/skip/infra_error counts. Supports filtering by kind, runtime, device_type, job_name, tree, branch, arch, and time range (since/until) before aggregation. Also adds a generic db.aggregate() method for running MongoDB aggregation pipelines. Signed-off-by: Denys Fedoryshchenko <denys.f@collabora.com>
1 parent 896d514 commit 26cf448

2 files changed

Lines changed: 106 additions & 0 deletions

File tree

api/db.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,12 @@ async def update(self, obj):
297297
raise ValueError(f"No object found with id: {obj.id}")
298298
return obj.__class__(**await col.find_one(ObjectId(obj.id)))
299299

300+
async def aggregate(self, model, pipeline):
301+
"""Run an aggregation pipeline on a model's collection"""
302+
col = self._get_collection(model)
303+
cursor = col.aggregate(pipeline)
304+
return await cursor.to_list(length=None)
305+
300306
async def delete_by_id(self, model, obj_id):
301307
"""Delete one object matching a given id"""
302308
col = self._get_collection(model)

api/main.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,6 +1040,106 @@ async def get_telemetry(request: Request):
10401040
return paginated_resp
10411041

10421042

1043+
TELEMETRY_STATS_GROUP_FIELDS = {
1044+
'runtime', 'device_type', 'job_name', 'tree', 'branch',
1045+
'arch', 'kind', 'error_type',
1046+
}
1047+
1048+
1049+
@app.get('/telemetry/stats', tags=["telemetry"])
1050+
async def get_telemetry_stats(request: Request):
1051+
"""Get aggregated telemetry statistics.
1052+
1053+
Query parameters:
1054+
- group_by: Comma-separated fields to group by
1055+
(runtime, device_type, job_name, tree, branch, arch,
1056+
kind, error_type)
1057+
- kind: Filter by event kind before aggregating
1058+
- runtime: Filter by runtime name
1059+
- since/until: Time range (ISO 8601)
1060+
1061+
Returns grouped counts with pass/fail/incomplete/infra_error
1062+
breakdowns for result-bearing events.
1063+
"""
1064+
metrics.add('http_requests_total', 1)
1065+
query_params = dict(request.query_params)
1066+
1067+
group_by_str = query_params.pop('group_by', None)
1068+
if not group_by_str:
1069+
raise HTTPException(
1070+
status_code=status.HTTP_400_BAD_REQUEST,
1071+
detail="'group_by' parameter is required",
1072+
)
1073+
group_by = [f.strip() for f in group_by_str.split(',')]
1074+
invalid = set(group_by) - TELEMETRY_STATS_GROUP_FIELDS
1075+
if invalid:
1076+
raise HTTPException(
1077+
status_code=status.HTTP_400_BAD_REQUEST,
1078+
detail=f"Invalid group_by fields: {invalid}",
1079+
)
1080+
1081+
match_stage = {}
1082+
for key in ('kind', 'runtime', 'device_type', 'job_name',
1083+
'tree', 'branch', 'arch'):
1084+
val = query_params.pop(key, None)
1085+
if val:
1086+
match_stage[key] = val
1087+
1088+
since = query_params.pop('since', None)
1089+
until = query_params.pop('until', None)
1090+
if since or until:
1091+
ts_filter = {}
1092+
if since:
1093+
ts_filter['$gte'] = datetime.fromisoformat(since)
1094+
if until:
1095+
ts_filter['$lte'] = datetime.fromisoformat(until)
1096+
match_stage['ts'] = ts_filter
1097+
1098+
group_id = {f: f'${f}' for f in group_by}
1099+
1100+
pipeline = []
1101+
if match_stage:
1102+
pipeline.append({'$match': match_stage})
1103+
pipeline.append({
1104+
'$group': {
1105+
'_id': group_id,
1106+
'total': {'$sum': 1},
1107+
'pass': {'$sum': {
1108+
'$cond': [{'$eq': ['$result', 'pass']}, 1, 0]
1109+
}},
1110+
'fail': {'$sum': {
1111+
'$cond': [{'$eq': ['$result', 'fail']}, 1, 0]
1112+
}},
1113+
'incomplete': {'$sum': {
1114+
'$cond': [{'$eq': ['$result', 'incomplete']}, 1, 0]
1115+
}},
1116+
'skip': {'$sum': {
1117+
'$cond': [{'$eq': ['$result', 'skip']}, 1, 0]
1118+
}},
1119+
'infra_error': {'$sum': {
1120+
'$cond': ['$is_infra_error', 1, 0]
1121+
}},
1122+
}
1123+
})
1124+
pipeline.append({'$sort': {'total': -1}})
1125+
1126+
results = await db.aggregate(TelemetryEvent, pipeline)
1127+
1128+
# Flatten _id into top-level fields
1129+
output = []
1130+
for doc in results:
1131+
row = doc['_id'].copy()
1132+
row['total'] = doc['total']
1133+
row['pass'] = doc['pass']
1134+
row['fail'] = doc['fail']
1135+
row['incomplete'] = doc['incomplete']
1136+
row['skip'] = doc['skip']
1137+
row['infra_error'] = doc['infra_error']
1138+
output.append(row)
1139+
1140+
return JSONResponse(content=jsonable_encoder(output))
1141+
1142+
10431143
# -----------------------------------------------------------------------------
10441144
# Nodes
10451145
def _get_node_event_data(operation, node, is_hierarchy=False):

0 commit comments

Comments
 (0)