Skip to content

Commit 364d604

Browse files
committed
api: Add POST /telemetry endpoint for bulk event ingestion
Add endpoint for pipeline services to submit telemetry events in bulk. Validates each event against the TelemetryEvent model and uses insert_many for efficient batch insertion. Signed-off-by: Denys Fedoryshchenko <denys.f@collabora.com>
1 parent 6563962 commit 364d604

1 file changed

Lines changed: 41 additions & 0 deletions

File tree

api/main.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
parse_node_obj,
5454
KernelVersion,
5555
EventHistory,
56+
TelemetryEvent,
5657
)
5758
from .auth import Authentication
5859
from .db import Database
@@ -953,6 +954,46 @@ async def get_events(request: Request):
953954
return JSONResponse(content=json_comp)
954955

955956

957+
# -----------------------------------------------------------------------------
958+
# Telemetry of pipeline execution and other events(not node stuff).
959+
# This is a separate collection from
960+
# EventHistory since it may have a much higher volume and different query patterns,
961+
# and we want to be able to optimize indexes and storage separately.
962+
963+
@app.post('/telemetry', response_model=dict, tags=["telemetry"])
964+
async def post_telemetry(
965+
events: List[dict],
966+
current_user: User = Depends(get_current_user),
967+
):
968+
"""Bulk insert telemetry events.
969+
970+
Accepts a list of telemetry event dicts. Each event must have at
971+
least 'kind' and 'runtime' fields. Events are validated against
972+
the TelemetryEvent model before insertion.
973+
"""
974+
metrics.add('http_requests_total', 1)
975+
if not events:
976+
raise HTTPException(
977+
status_code=status.HTTP_400_BAD_REQUEST,
978+
detail="Events list cannot be empty",
979+
)
980+
col = db._get_collection(TelemetryEvent)
981+
docs = []
982+
for event in events:
983+
try:
984+
obj = TelemetryEvent(**event)
985+
except Exception as exc:
986+
raise HTTPException(
987+
status_code=status.HTTP_400_BAD_REQUEST,
988+
detail=f"Invalid telemetry event: {exc}",
989+
) from exc
990+
doc = obj.model_dump(by_alias=True)
991+
doc.pop('_id', None)
992+
docs.append(doc)
993+
result = await col.insert_many(docs)
994+
return {"inserted": len(result.inserted_ids)}
995+
996+
956997
# -----------------------------------------------------------------------------
957998
# Nodes
958999
def _get_node_event_data(operation, node, is_hierarchy=False):

0 commit comments

Comments
 (0)