@@ -27,7 +27,10 @@ function validateEvent(event) {
2727 'data'
2828 ] ;
2929 // Check top-level required fields
30- if ( ! requiredFields . every ( field => event . hasOwnProperty ( field ) ) ) {
30+ const missingFields = requiredFields . filter ( field => ! event . hasOwnProperty ( field ) ) ;
31+
32+ if ( missingFields . length > 0 ) {
33+ console . error ( `Event validation failed. Missing required fields: ${ missingFields . join ( ', ' ) } . EventID: ${ event . id || 'unknown' } , EventType: ${ event . type || 'unknown' } ` ) ;
3134 return false ;
3235 }
3336 return true ;
@@ -54,19 +57,20 @@ async function sendToEventBridge(events, eventBusArn) {
5457 const response = await eventBridge . send ( new PutEventsCommand ( { Entries : entries } ) ) ;
5558 response . FailedEntryCount && response . Entries . forEach ( ( entry , idx ) => {
5659 if ( entry . ErrorCode ) {
57- console . warn ( `Event failed with error : ${ entry . ErrorCode } ` ) ;
60+ console . error ( `Event failed to send to EventBridge. ErrorCode : ${ entry . ErrorCode } , ErrorMessage: ${ entry . ErrorMessage } , EventID: ${ batch [ idx ] . id } , EventType: ${ batch [ idx ] . type } ` ) ;
5861 failedEvents . push ( batch [ idx ] ) ;
5962 }
6063 } ) ;
6164 break ;
6265 } catch ( error ) {
63- console . error ( `EventBridge send error: ${ error } ` ) ;
66+ console . error ( `EventBridge send error: ${ error . name } , Message: ${ error . message } , Code: ${ error . $metadata ?. httpStatusCode } , RequestId: ${ error . $metadata ?. requestId } ` ) ;
6467
6568 if ( error . retryable ) {
6669 console . warn ( `Retrying after backoff: attempt ${ attempts + 1 } ` ) ;
6770 await new Promise ( res => setTimeout ( res , 2 ** attempts * 100 ) ) ;
6871 attempts ++ ;
6972 } else {
73+ console . error ( `Non-retryable error encountered. Moving ${ batch . length } events to DLQ` ) ;
7074 failedEvents . push ( ...batch ) ;
7175 break ;
7276 }
@@ -77,9 +81,17 @@ async function sendToEventBridge(events, eventBusArn) {
7781}
7882
7983async function sendToDLQ ( events ) {
84+ if ( events . length === 0 ) return ;
85+
86+ console . warn ( `Sending ${ events . length } failed event(s) to DLQ: ${ DLQ_URL } ` ) ;
87+
8088 for ( const event of events ) {
81- console . warn ( `Sending ${ events . length } failed events to DLQ` ) ;
82- await sqs . send ( new SendMessageCommand ( { QueueUrl : DLQ_URL , MessageBody : JSON . stringify ( event ) } ) ) ;
89+ try {
90+ await sqs . send ( new SendMessageCommand ( { QueueUrl : DLQ_URL , MessageBody : JSON . stringify ( event ) } ) ) ;
91+ console . debug ( `Successfully sent event ${ event . id } to DLQ` ) ;
92+ } catch ( error ) {
93+ console . error ( `Failed to send event ${ event . id } to DLQ - Name: ${ error . name } , Message: ${ error . message } , Code: ${ error . $metadata ?. httpStatusCode } , RequestId: ${ error . $metadata ?. requestId } ` ) ;
94+ }
8395 }
8496}
8597
@@ -96,8 +108,10 @@ exports.handler = async (snsEvent) => {
96108 const invalidEvents = records . filter ( event => ! validateEvent ( event ) ) ;
97109
98110 console . debug ( `Valid events: ${ validEvents . length } , Invalid events: ${ invalidEvents . length } ` ) ;
99-
100- if ( invalidEvents . length ) await sendToDLQ ( invalidEvents ) ;
111+ if ( invalidEvents . length ) {
112+ console . warn ( `${ invalidEvents . length } event(s) failed validation and will be sent to DLQ` ) ;
113+ await sendToDLQ ( invalidEvents ) ;
114+ }
101115
102116 const dataEvents = validEvents . filter ( event => event . plane === 'data' ) ;
103117 const controlEvents = validEvents . filter ( event => event . plane === 'control' ) ;
0 commit comments