Skip to content

Commit 9069585

Browse files
committed
api: Add GET /telemetry/anomalies endpoint for anomaly detection
Add on-demand anomaly detection endpoint that identifies: 1. Runtime+device_type combinations with high infra error or failure rates exceeding a configurable threshold 2. Runtimes with recurring submission/connectivity errors Parameters: window (1h-48h), threshold (0.0-1.0), min_total (minimum event count to avoid noise from small samples). Signed-off-by: Denys Fedoryshchenko <denys.f@collabora.com>
1 parent 26cf448 commit 9069585

1 file changed

Lines changed: 138 additions & 0 deletions

File tree

api/main.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,6 +1050,13 @@ async def get_telemetry(request: Request):
10501050
async def get_telemetry_stats(request: Request):
10511051
"""Get aggregated telemetry statistics.
10521052
1053+
This is rule-based anomaly detection using
1054+
thresholded empirical rates computed over
1055+
a sliding (rolling) time window.
1056+
This is not a full anomaly detection system
1057+
with baselines or machine learning, but at
1058+
last something to start with.
1059+
10531060
Query parameters:
10541061
- group_by: Comma-separated fields to group by
10551062
(runtime, device_type, job_name, tree, branch, arch,
@@ -1139,6 +1146,137 @@ async def get_telemetry_stats(request: Request):
11391146

11401147
return JSONResponse(content=jsonable_encoder(output))
11411148

1149+
# This is test value, can adjust based on expected query patterns and volumes.
1150+
ANOMALY_WINDOW_MAP = {
1151+
'1h': 1, '3h': 3, '6h': 6, '12h': 12, '24h': 24, '48h': 48,
1152+
}
1153+
1154+
1155+
@app.get('/telemetry/anomalies', tags=["telemetry"])
1156+
async def get_telemetry_anomalies(
1157+
window: str = Query(
1158+
'6h', description='Time window: 1h, 3h, 6h, 12h, 24h, 48h'
1159+
),
1160+
threshold: float = Query(
1161+
0.5, ge=0.0, le=1.0,
1162+
description='Min failure/infra error rate to flag (0.0-1.0)'
1163+
),
1164+
min_total: int = Query(
1165+
3, ge=1,
1166+
description='Min events in window to consider (avoids noise)'
1167+
),
1168+
):
1169+
"""Detect anomalies in telemetry data.
1170+
1171+
Finds runtime+device_type combinations where the infra error
1172+
rate or failure rate exceeds the threshold within the given
1173+
time window. Also detects runtimes with submission errors.
1174+
1175+
Returns a list sorted by severity (highest error rate first).
1176+
"""
1177+
metrics.add('http_requests_total', 1)
1178+
1179+
hours = ANOMALY_WINDOW_MAP.get(window)
1180+
if not hours:
1181+
raise HTTPException(
1182+
status_code=status.HTTP_400_BAD_REQUEST,
1183+
detail=f"Invalid window '{window}'. "
1184+
f"Use: {', '.join(ANOMALY_WINDOW_MAP.keys())}",
1185+
)
1186+
since = datetime.utcnow() - timedelta(hours=hours)
1187+
1188+
# Anomaly 1: High infra error / failure rate per runtime+device_type
1189+
result_pipeline = [
1190+
{'$match': {
1191+
'kind': {'$in': ['job_result', 'test_result']},
1192+
'ts': {'$gte': since},
1193+
}},
1194+
{'$group': {
1195+
'_id': {
1196+
'runtime': '$runtime',
1197+
'device_type': '$device_type',
1198+
},
1199+
'total': {'$sum': 1},
1200+
'fail': {'$sum': {
1201+
'$cond': [{'$eq': ['$result', 'fail']}, 1, 0]
1202+
}},
1203+
'incomplete': {'$sum': {
1204+
'$cond': [{'$eq': ['$result', 'incomplete']}, 1, 0]
1205+
}},
1206+
'infra_error': {'$sum': {
1207+
'$cond': ['$is_infra_error', 1, 0]
1208+
}},
1209+
}},
1210+
{'$match': {'total': {'$gte': min_total}}},
1211+
{'$addFields': {
1212+
'infra_rate': {
1213+
'$divide': ['$infra_error', '$total']
1214+
},
1215+
'fail_rate': {
1216+
'$divide': [
1217+
{'$add': ['$fail', '$incomplete']}, '$total'
1218+
]
1219+
},
1220+
}},
1221+
{'$match': {
1222+
'$or': [
1223+
{'infra_rate': {'$gte': threshold}},
1224+
{'fail_rate': {'$gte': threshold}},
1225+
]
1226+
}},
1227+
{'$sort': {'infra_rate': -1, 'fail_rate': -1}},
1228+
]
1229+
1230+
# Anomaly 2: Submission/connectivity errors per runtime
1231+
error_pipeline = [
1232+
{'$match': {
1233+
'kind': {'$in': ['runtime_error', 'job_skip']},
1234+
'ts': {'$gte': since},
1235+
}},
1236+
{'$group': {
1237+
'_id': {
1238+
'runtime': '$runtime',
1239+
'error_type': '$error_type',
1240+
},
1241+
'count': {'$sum': 1},
1242+
}},
1243+
{'$match': {'count': {'$gte': min_total}}},
1244+
{'$sort': {'count': -1}},
1245+
]
1246+
1247+
result_anomalies = await db.aggregate(
1248+
TelemetryEvent, result_pipeline
1249+
)
1250+
error_anomalies = await db.aggregate(
1251+
TelemetryEvent, error_pipeline
1252+
)
1253+
1254+
output = {
1255+
'window': window,
1256+
'threshold': threshold,
1257+
'min_total': min_total,
1258+
'since': since.isoformat(),
1259+
'result_anomalies': [],
1260+
'error_anomalies': [],
1261+
}
1262+
1263+
for doc in result_anomalies:
1264+
row = doc['_id'].copy()
1265+
row['total'] = doc['total']
1266+
row['fail'] = doc['fail']
1267+
row['incomplete'] = doc['incomplete']
1268+
row['infra_error'] = doc['infra_error']
1269+
row['infra_rate'] = round(doc['infra_rate'], 3)
1270+
row['fail_rate'] = round(doc['fail_rate'], 3)
1271+
output['result_anomalies'].append(row)
1272+
1273+
for doc in error_anomalies:
1274+
row = doc['_id'].copy()
1275+
row['count'] = doc['count']
1276+
output['error_anomalies'].append(row)
1277+
1278+
return JSONResponse(content=jsonable_encoder(output))
1279+
11421280

11431281
# -----------------------------------------------------------------------------
11441282
# Nodes

0 commit comments

Comments
 (0)