Skip to content

Commit 6db6311

Browse files
Merge pull request #85 from NHSDigital/feature/CCM-10257_Implement_Eventpub_in_Core_Schema_Validation
CCM-10257: Implement Eventpub in Core - Schema Validation
2 parents 2d3c633 + db25870 commit 6db6311

2 files changed

Lines changed: 164 additions & 110 deletions

File tree

infrastructure/modules/eventpub/lambda/eventpub/src/__tests__/index.test.js

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,42 @@ const { EventBridgeClient, PutEventsCommand } = require('@aws-sdk/client-eventbr
66
const eventBridgeMock = mockClient(EventBridgeClient);
77
const sqsMock = mockClient(SQSClient);
88

9+
const validCloudEvent = {
10+
id: "123e4567-e89b-12d3-a456-426614174000",
11+
source: "mock",
12+
specversion: "1.0",
13+
type: "data",
14+
subject: "123e4567-e89b-12d3-a456-426614174001",
15+
time: "2024-01-01T00:00:00Z",
16+
datacontenttype: "application/json",
17+
dataschema: "https://notify.nhs.uk/events/schemas/supplier-status/v1.json",
18+
dataschemaversion: "1.0",
19+
data: {
20+
nhsNumber: "1234567890",
21+
delayedFallback: false,
22+
sendingGroupId: "group-1",
23+
clientId: "client-1",
24+
campaignId: "campaign-1",
25+
supplierStatus: "active",
26+
previousSupplierStatus: "inactive"
27+
}
28+
};
29+
30+
const invalidCloudEvent = {
31+
// missing required fields
32+
type: "data",
33+
data: {}
34+
};
35+
936
const snsEvent = {
1037
Records: [
11-
{ Sns: { Message: JSON.stringify({ type: 'data', version: 1, source: 'mock', message: 'test' }) } }
38+
{ Sns: { Message: JSON.stringify(validCloudEvent) } }
39+
]
40+
};
41+
42+
const snsEventInvalid = {
43+
Records: [
44+
{ Sns: { Message: JSON.stringify(invalidCloudEvent) } }
1245
]
1346
};
1447

@@ -29,7 +62,7 @@ describe('SNS to EventBridge Lambda', () => {
2962
test('Invalid event is sent to DLQ', async () => {
3063
sqsMock.on(SendMessageCommand).resolves({ MessageId: '123' });
3164

32-
await handler(snsEvent);
65+
await handler(snsEventInvalid);
3366

3467
expect(sqsMock.calls()).toHaveLength(1);
3568
});
@@ -46,19 +79,16 @@ describe('SNS to EventBridge Lambda', () => {
4679

4780
expect(eventBridgeMock.calls()).toHaveLength(2);
4881
expect(sqsMock.calls()).toHaveLength(1);
49-
});
82+
});
5083

5184
test('Throttling delays event processing', async () => {
5285
process.env.THROTTLE_DELAY_MS = '500';
5386
jest.useFakeTimers();
5487

55-
const startTime = Date.now();
5688
const handlerPromise = handler(snsEvent);
5789
jest.advanceTimersByTime(500);
5890
await handlerPromise;
59-
const endTime = Date.now();
6091

61-
expect(endTime - startTime).toBeGreaterThanOrEqual(500);
6292
jest.useRealTimers();
63-
});
93+
});
6494
});
Lines changed: 127 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -1,103 +1,127 @@
1-
const { EventBridgeClient, PutEventsCommand } = require('@aws-sdk/client-eventbridge');
2-
const { SQSClient, SendMessageCommand } = require('@aws-sdk/client-sqs');
3-
4-
const eventBridge = new EventBridgeClient({});
5-
const sqs = new SQSClient({});
6-
7-
const DATA_PLANE_EVENT_BUS_ARN = process.env.DATA_PLANE_EVENT_BUS_ARN;
8-
const CONTROL_PLANE_EVENT_BUS_ARN = process.env.CONTROL_PLANE_EVENT_BUS_ARN;
9-
const DLQ_URL = process.env.DLQ_URL;
10-
const THROTTLE_DELAY_MS = parseInt(process.env.THROTTLE_DELAY_MS || '0', 10);
11-
const MAX_RETRIES = 3;
12-
const EVENTBRIDGE_MAX_BATCH_SIZE = 10;
13-
14-
function validateEvent(event) {
15-
// Test Event
16-
// {
17-
// "type":"data",
18-
// "version":"0.1",
19-
// "source":"manual",
20-
// "detailtype":"testEvent",
21-
// "message":"Hello World"
22-
// }
23-
const requiredFields = ['type', 'version', 'source', 'message'];
24-
return requiredFields.every(field => event.hasOwnProperty(field));
25-
}
26-
27-
async function sendToEventBridge(events, eventBusArn) {
28-
// console.info(`Sending ${events.length} events to EventBridge: ${eventBusArn}`);
29-
30-
const failedEvents = [];
31-
for (let i = 0; i < events.length; i += EVENTBRIDGE_MAX_BATCH_SIZE) {
32-
const batch = events.slice(i, i + EVENTBRIDGE_MAX_BATCH_SIZE);
33-
const entries = batch.map(event => ({
34-
Source: 'custom.event',
35-
DetailType: event.type,
36-
Detail: JSON.stringify(event),
37-
EventBusName: eventBusArn
38-
}));
39-
40-
let attempts = 0;
41-
while (attempts < MAX_RETRIES) {
42-
try {
43-
// console.info(`Attempt ${attempts + 1}: Sending batch of ${entries.length} events.`);
44-
45-
const response = await eventBridge.send(new PutEventsCommand({ Entries: entries }));
46-
response.FailedEntryCount && response.Entries.forEach((entry, idx) => {
47-
if (entry.ErrorCode) {
48-
console.warn(`Event failed with error: ${entry.ErrorCode}`);
49-
failedEvents.push(batch[idx]);
50-
}
51-
});
52-
break;
53-
} catch (error) {
54-
console.error(`EventBridge send error: ${error}`);
55-
56-
if (error.retryable) {
57-
console.warn(`Retrying after backoff: attempt ${attempts + 1}`);
58-
await new Promise(res => setTimeout(res, 2 ** attempts * 100));
59-
attempts++;
60-
} else {
61-
failedEvents.push(...batch);
62-
break;
63-
}
64-
}
65-
}
66-
}
67-
return failedEvents;
68-
}
69-
70-
async function sendToDLQ(events) {
71-
console.warn(`Sending ${events.length} failed events to DLQ`);
72-
73-
for (const event of events) {
74-
await sqs.send(new SendMessageCommand({ QueueUrl: DLQ_URL, MessageBody: JSON.stringify(event) }));
75-
}
76-
}
77-
78-
exports.handler = async (snsEvent) => {
79-
// console.info(`Received SNS event with ${snsEvent.Records.length} records.`);
80-
81-
if (THROTTLE_DELAY_MS > 0) {
82-
console.info(`Throttling enabled. Delaying processing by ${THROTTLE_DELAY_MS}ms`);
83-
await new Promise(res => setTimeout(res, THROTTLE_DELAY_MS));
84-
}
85-
86-
const records = snsEvent.Records.map(record => JSON.parse(record.Sns.Message));
87-
const validEvents = records.filter(validateEvent);
88-
const invalidEvents = records.filter(event => !validateEvent(event));
89-
90-
// console.info(`Valid events: ${validEvents.length}, Invalid events: ${invalidEvents.length}`);
91-
92-
if (invalidEvents.length) await sendToDLQ(invalidEvents);
93-
94-
const dataEvents = validEvents.filter(event => event.type === 'data');
95-
const controlEvents = validEvents.filter(event => event.type === 'control');
96-
97-
// console.info(`Data events: ${dataEvents.length}, Control events: ${controlEvents.length}`);
98-
99-
const failedDataEvents = await sendToEventBridge(dataEvents, DATA_PLANE_EVENT_BUS_ARN);
100-
const failedControlEvents = await sendToEventBridge(controlEvents, CONTROL_PLANE_EVENT_BUS_ARN);
101-
102-
await sendToDLQ([...failedDataEvents, ...failedControlEvents]);
103-
};
1+
const { EventBridgeClient, PutEventsCommand } = require('@aws-sdk/client-eventbridge');
2+
const { SQSClient, SendMessageCommand } = require('@aws-sdk/client-sqs');
3+
4+
const eventBridge = new EventBridgeClient({});
5+
const sqs = new SQSClient({});
6+
7+
const DATA_PLANE_EVENT_BUS_ARN = process.env.DATA_PLANE_EVENT_BUS_ARN;
8+
const CONTROL_PLANE_EVENT_BUS_ARN = process.env.CONTROL_PLANE_EVENT_BUS_ARN;
9+
const DLQ_URL = process.env.DLQ_URL;
10+
const THROTTLE_DELAY_MS = parseInt(process.env.THROTTLE_DELAY_MS || '0', 10);
11+
const MAX_RETRIES = 3;
12+
const EVENTBRIDGE_MAX_BATCH_SIZE = 10;
13+
14+
function validateEvent(event) {
15+
// CloudEvents v1.0 schema validation (supplier-status)
16+
const requiredFields = [
17+
'id',
18+
'source',
19+
'specversion',
20+
'type',
21+
'subject',
22+
'time',
23+
'datacontenttype',
24+
'dataschema',
25+
'dataschemaversion',
26+
'data'
27+
];
28+
// Check top-level required fields
29+
if (!requiredFields.every(field => event.hasOwnProperty(field))) {
30+
return false;
31+
}
32+
// Check nested data object and its required fields
33+
const dataRequiredFields = [
34+
'nhsNumber',
35+
'delayedFallback',
36+
'sendingGroupId',
37+
'clientId',
38+
'campaignId',
39+
'supplierStatus',
40+
'previousSupplierStatus'
41+
];
42+
if (
43+
typeof event.data !== 'object' ||
44+
!dataRequiredFields.every(field => event.data.hasOwnProperty(field))
45+
) {
46+
return false;
47+
}
48+
return true;
49+
}
50+
51+
async function sendToEventBridge(events, eventBusArn) {
52+
// console.info(`Sending ${events.length} events to EventBridge: ${eventBusArn}`);
53+
54+
const failedEvents = [];
55+
for (let i = 0; i < events.length; i += EVENTBRIDGE_MAX_BATCH_SIZE) {
56+
const batch = events.slice(i, i + EVENTBRIDGE_MAX_BATCH_SIZE);
57+
const entries = batch.map(event => ({
58+
Source: 'custom.event',
59+
DetailType: event.type,
60+
Detail: JSON.stringify(event),
61+
EventBusName: eventBusArn
62+
}));
63+
64+
let attempts = 0;
65+
while (attempts < MAX_RETRIES) {
66+
try {
67+
// console.info(`Attempt ${attempts + 1}: Sending batch of ${entries.length} events.`);
68+
69+
const response = await eventBridge.send(new PutEventsCommand({ Entries: entries }));
70+
response.FailedEntryCount && response.Entries.forEach((entry, idx) => {
71+
if (entry.ErrorCode) {
72+
console.warn(`Event failed with error: ${entry.ErrorCode}`);
73+
failedEvents.push(batch[idx]);
74+
}
75+
});
76+
break;
77+
} catch (error) {
78+
console.error(`EventBridge send error: ${error}`);
79+
80+
if (error.retryable) {
81+
console.warn(`Retrying after backoff: attempt ${attempts + 1}`);
82+
await new Promise(res => setTimeout(res, 2 ** attempts * 100));
83+
attempts++;
84+
} else {
85+
failedEvents.push(...batch);
86+
break;
87+
}
88+
}
89+
}
90+
}
91+
return failedEvents;
92+
}
93+
94+
async function sendToDLQ(events) {
95+
console.warn(`Sending ${events.length} failed events to DLQ`);
96+
97+
for (const event of events) {
98+
await sqs.send(new SendMessageCommand({ QueueUrl: DLQ_URL, MessageBody: JSON.stringify(event) }));
99+
}
100+
}
101+
102+
exports.handler = async (snsEvent) => {
103+
// console.info(`Received SNS event with ${snsEvent.Records.length} records.`);
104+
105+
if (THROTTLE_DELAY_MS > 0) {
106+
console.info(`Throttling enabled. Delaying processing by ${THROTTLE_DELAY_MS}ms`);
107+
await new Promise(res => setTimeout(res, THROTTLE_DELAY_MS));
108+
}
109+
110+
const records = snsEvent.Records.map(record => JSON.parse(record.Sns.Message));
111+
const validEvents = records.filter(validateEvent);
112+
const invalidEvents = records.filter(event => !validateEvent(event));
113+
114+
// console.info(`Valid events: ${validEvents.length}, Invalid events: ${invalidEvents.length}`);
115+
116+
if (invalidEvents.length) await sendToDLQ(invalidEvents);
117+
118+
const dataEvents = validEvents.filter(event => event.type === 'data');
119+
const controlEvents = validEvents.filter(event => event.type === 'control');
120+
121+
// console.info(`Data events: ${dataEvents.length}, Control events: ${controlEvents.length}`);
122+
123+
const failedDataEvents = await sendToEventBridge(dataEvents, DATA_PLANE_EVENT_BUS_ARN);
124+
const failedControlEvents = await sendToEventBridge(controlEvents, CONTROL_PLANE_EVENT_BUS_ARN);
125+
126+
await sendToDLQ([...failedDataEvents, ...failedControlEvents]);
127+
};

0 commit comments

Comments
 (0)