@@ -1512,11 +1512,92 @@ export class ClickhouseEventRepository implements IEventRepository {
15121512 async getRunEvents (
15131513 storeTable : TaskEventStoreTable ,
15141514 environmentId : string ,
1515+ traceId : string ,
15151516 runId : string ,
15161517 startCreatedAt : Date ,
15171518 endCreatedAt ?: Date
15181519 ) : Promise < RunPreparedEvent [ ] > {
1519- throw new Error ( "ClickhouseEventRepository.getRunEvents not implemented" ) ;
1520+ const startCreatedAtWithBuffer = new Date ( startCreatedAt . getTime ( ) - 1000 ) ;
1521+
1522+ const queryBuilder = this . _clickhouse . taskEvents . traceSummaryQueryBuilder ( ) ;
1523+
1524+ queryBuilder . where ( "environment_id = {environmentId: String}" , { environmentId } ) ;
1525+ queryBuilder . where ( "trace_id = {traceId: String}" , { traceId } ) ;
1526+ queryBuilder . where ( "run_id = {runId: String}" , { runId } ) ;
1527+ queryBuilder . where ( "start_time >= {startCreatedAt: String}" , {
1528+ startCreatedAt : convertDateToNanoseconds ( startCreatedAtWithBuffer ) . toString ( ) ,
1529+ } ) ;
1530+
1531+ if ( endCreatedAt ) {
1532+ queryBuilder . where ( "start_time <= {endCreatedAt: String}" , {
1533+ endCreatedAt : convertDateToNanoseconds ( endCreatedAt ) . toString ( ) ,
1534+ } ) ;
1535+ }
1536+
1537+ queryBuilder . where ( "kind != {kind: String}" , { kind : "DEBUG_EVENT" } ) ;
1538+ queryBuilder . orderBy ( "start_time ASC" ) ;
1539+
1540+ if ( this . _config . maximumTraceSummaryViewCount ) {
1541+ queryBuilder . limit ( this . _config . maximumTraceSummaryViewCount ) ;
1542+ }
1543+
1544+ const [ queryError , records ] = await queryBuilder . execute ( ) ;
1545+
1546+ if ( queryError ) {
1547+ throw queryError ;
1548+ }
1549+
1550+ if ( ! records ) {
1551+ return [ ] ;
1552+ }
1553+
1554+ const recordsGroupedBySpanId = records . reduce ( ( acc , record ) => {
1555+ acc [ record . span_id ] = [ ...( acc [ record . span_id ] ?? [ ] ) , record ] ;
1556+ return acc ;
1557+ } , { } as Record < string , TaskEventSummaryV1Result [ ] > ) ;
1558+
1559+ const spanSummaries = new Map < string , SpanSummary > ( ) ;
1560+
1561+ for ( const [ spanId , spanRecords ] of Object . entries ( recordsGroupedBySpanId ) ) {
1562+ const spanSummary = this . #mergeRecordsIntoSpanSummary( spanId , spanRecords ) ;
1563+
1564+ if ( ! spanSummary ) {
1565+ continue ;
1566+ }
1567+
1568+ spanSummaries . set ( spanId , spanSummary ) ;
1569+ }
1570+
1571+ const spans = Array . from ( spanSummaries . values ( ) ) ;
1572+
1573+ const overridesBySpanId : Record < string , SpanOverride > = { } ;
1574+
1575+ const finalSpans = spans . map ( ( span ) => {
1576+ return this . #applyAncestorOverrides( span , spanSummaries , overridesBySpanId ) ;
1577+ } ) ;
1578+
1579+ const runPreparedEvents = finalSpans . map ( ( span ) => this . #spanSummaryToRunPreparedEvent( span ) ) ;
1580+
1581+ return runPreparedEvents ;
1582+ }
1583+
1584+ #spanSummaryToRunPreparedEvent( span : SpanSummary ) : RunPreparedEvent {
1585+ return {
1586+ spanId : span . id ,
1587+ parentId : span . parentId ?? null ,
1588+ runId : span . runId ,
1589+ message : span . data . message ,
1590+ style : span . data . style ,
1591+ events : span . data . events ,
1592+ startTime : convertDateToNanoseconds ( span . data . startTime ) ,
1593+ duration : span . data . duration ,
1594+ isError : span . data . isError ,
1595+ isPartial : span . data . isPartial ,
1596+ isCancelled : span . data . isCancelled ,
1597+ kind : "UNSPECIFIED" ,
1598+ attemptNumber : span . data . attemptNumber ?? null ,
1599+ level : span . data . level ,
1600+ } ;
15201601 }
15211602}
15221603
0 commit comments