@@ -18,6 +18,7 @@ function validateEvent(event) {
1818 'source' ,
1919 'specversion' ,
2020 'type' ,
21+ 'plane' ,
2122 'subject' ,
2223 'time' ,
2324 'datacontenttype' ,
@@ -49,10 +50,10 @@ function validateEvent(event) {
4950}
5051
5152async function sendToEventBridge ( events , eventBusArn ) {
52- // console.info(`Sending ${events.length} events to EventBridge: ${eventBusArn}`) ;
53+ const failedEvents = [ ] ;
5354
54- const failedEvents = [ ] ;
55- for ( let i = 0 ; i < events . length ; i += EVENTBRIDGE_MAX_BATCH_SIZE ) {
55+ for ( let i = 0 ; i < events . length ; i += EVENTBRIDGE_MAX_BATCH_SIZE ) {
56+ console . debug ( `Sending ${ events . length } events to EventBridge: ${ eventBusArn } ` ) ;
5657 const batch = events . slice ( i , i + EVENTBRIDGE_MAX_BATCH_SIZE ) ;
5758 const entries = batch . map ( event => ( {
5859 Source : 'custom.event' ,
@@ -64,7 +65,7 @@ async function sendToEventBridge(events, eventBusArn) {
6465 let attempts = 0 ;
6566 while ( attempts < MAX_RETRIES ) {
6667 try {
67- // console.info (`Attempt ${attempts + 1}: Sending batch of ${entries.length} events.`);
68+ console . debug ( `Attempt ${ attempts + 1 } : Sending batch of ${ entries . length } events.` ) ;
6869
6970 const response = await eventBridge . send ( new PutEventsCommand ( { Entries : entries } ) ) ;
7071 response . FailedEntryCount && response . Entries . forEach ( ( entry , idx ) => {
@@ -92,15 +93,14 @@ async function sendToEventBridge(events, eventBusArn) {
9293}
9394
9495async function sendToDLQ ( events ) {
95- console . warn ( `Sending ${ events . length } failed events to DLQ` ) ;
96-
97- for ( const event of events ) {
96+ for ( const event of events ) {
97+ console . warn ( `Sending ${ events . length } failed events to DLQ` ) ;
9898 await sqs . send ( new SendMessageCommand ( { QueueUrl : DLQ_URL , MessageBody : JSON . stringify ( event ) } ) ) ;
9999 }
100100}
101101
102102exports . handler = async ( snsEvent ) => {
103- // console.info (`Received SNS event with ${snsEvent.Records.length} records.`);
103+ console . debug ( `Received SNS event with ${ snsEvent . Records . length } records.` ) ;
104104
105105 if ( THROTTLE_DELAY_MS > 0 ) {
106106 console . info ( `Throttling enabled. Delaying processing by ${ THROTTLE_DELAY_MS } ms` ) ;
@@ -111,14 +111,14 @@ exports.handler = async (snsEvent) => {
111111 const validEvents = records . filter ( validateEvent ) ;
112112 const invalidEvents = records . filter ( event => ! validateEvent ( event ) ) ;
113113
114- // console.info (`Valid events: ${validEvents.length}, Invalid events: ${invalidEvents.length}`);
114+ console . debug ( `Valid events: ${ validEvents . length } , Invalid events: ${ invalidEvents . length } ` ) ;
115115
116116 if ( invalidEvents . length ) await sendToDLQ ( invalidEvents ) ;
117117
118- const dataEvents = validEvents . filter ( event => event . type === 'data' ) ;
119- const controlEvents = validEvents . filter ( event => event . type === 'control' ) ;
118+ const dataEvents = validEvents . filter ( event => event . plane === 'data' ) ;
119+ const controlEvents = validEvents . filter ( event => event . plane === 'control' ) ;
120120
121- // console.info (`Data events: ${dataEvents.length}, Control events: ${controlEvents.length}`);
121+ console . debug ( `Data events: ${ dataEvents . length } , Control events: ${ controlEvents . length } ` ) ;
122122
123123 const failedDataEvents = await sendToEventBridge ( dataEvents , DATA_PLANE_EVENT_BUS_ARN ) ;
124124 const failedControlEvents = await sendToEventBridge ( controlEvents , CONTROL_PLANE_EVENT_BUS_ARN ) ;
0 commit comments