Skip to content

Commit 0604a14

Browse files
Merge pull request #47 from NHSDigital/CCM-8569_CCM-8570_updatesToEventPub
CCM-8569 CCM-8570 updates to event pub
2 parents f897562 + a878269 commit 0604a14

8 files changed

Lines changed: 1593 additions & 83 deletions

File tree

infrastructure/modules/eventpub/iam_role_lambda.tf

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,19 @@ data "aws_iam_policy_document" "lambda" {
6767
]
6868
}
6969

70+
statement {
71+
sid = "DLQPutMessage"
72+
effect = "Allow"
73+
74+
actions = [
75+
"sqs:SendMessage",
76+
]
77+
78+
resources = [
79+
aws_sqs_queue.dlq.arn
80+
]
81+
}
82+
7083
statement {
7184
sid = "KMSCloudwatchKeyAccess"
7285
effect = "Allow"

infrastructure/modules/eventpub/lambda/eventpub/package-lock.json

Lines changed: 1401 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

infrastructure/modules/eventpub/lambda/eventpub/package.json

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,10 @@
1616
]
1717
},
1818
"author": "NHS Notify",
19-
"license": "ISC"
19+
"license": "ISC",
20+
"dependencies": {
21+
"@aws-sdk/client-eventbridge": "^3.744.0",
22+
"@aws-sdk/client-sqs": "^3.744.0",
23+
"aws-sdk-client-mock": "^4.1.0"
24+
}
2025
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
const { handler } = require('../index.js');
2+
const { mockClient } = require('aws-sdk-client-mock');
3+
const { SQSClient, SendMessageCommand } = require('@aws-sdk/client-sqs');
4+
const { EventBridgeClient, PutEventsCommand } = require('@aws-sdk/client-eventbridge');
5+
6+
const eventBridgeMock = mockClient(EventBridgeClient);
7+
const sqsMock = mockClient(SQSClient);
8+
9+
const snsEvent = {
10+
Records: [
11+
{ Sns: { Message: JSON.stringify({ type: 'data', version: 1, source: 'mock', message: 'test' }) } }
12+
]
13+
};
14+
15+
describe('SNS to EventBridge Lambda', () => {
16+
beforeEach(() => {
17+
eventBridgeMock.reset();
18+
sqsMock.reset();
19+
});
20+
21+
test('Valid event is sent to the correct EventBridge bus', async () => {
22+
eventBridgeMock.on(PutEventsCommand).resolves({ FailedEntryCount: 0, Entries: [{}] });
23+
24+
await handler(snsEvent);
25+
26+
expect(eventBridgeMock.calls()).toHaveLength(1);
27+
});
28+
29+
test('Invalid event is sent to DLQ', async () => {
30+
sqsMock.on(SendMessageCommand).resolves({ MessageId: '123' });
31+
32+
await handler(snsEvent);
33+
34+
expect(sqsMock.calls()).toHaveLength(1);
35+
});
36+
37+
38+
test('Retries on EventBridge failure and sends failed events to DLQ', async () => {
39+
eventBridgeMock
40+
.on(PutEventsCommand)
41+
.rejectsOnce(Object.assign(new Error('Rate limit exceeded'), { retryable: true }))
42+
.resolves({ FailedEntryCount: 1, Entries: [{ ErrorCode: 'InternalFailure' }] });
43+
sqsMock.on(SendMessageCommand).resolves({ MessageId: '123' });
44+
45+
await handler(snsEvent);
46+
47+
expect(eventBridgeMock.calls()).toHaveLength(2);
48+
expect(sqsMock.calls()).toHaveLength(1);
49+
});
50+
51+
test('Throttling delays event processing', async () => {
52+
process.env.THROTTLE_DELAY_MS = '500';
53+
jest.useFakeTimers();
54+
55+
const startTime = Date.now();
56+
const handlerPromise = handler(snsEvent);
57+
jest.advanceTimersByTime(500);
58+
await handlerPromise;
59+
const endTime = Date.now();
60+
61+
expect(endTime - startTime).toBeGreaterThanOrEqual(500);
62+
jest.useRealTimers();
63+
});
64+
});
Lines changed: 99 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,103 @@
1-
const { EventBridgeClient, PutEventsCommand } = require('@aws-sdk/client-eventbridge');
2-
const eventBridgeClient = new EventBridgeClient();
3-
4-
exports.handler = async (event) => {
5-
try {
6-
const controlEventBusArn = process.env.CONTROL_PLANE_EVENT_BUS_ARN;
7-
const dataEventBusArn = process.env.DATA_PLANE_EVENT_BUS_ARN;
8-
9-
if (!event.Records || !Array.isArray(event.Records)) {
10-
throw new Error("Invalid event format. Expected an array of records.");
11-
}
12-
13-
const batchSize = 10; // AWS EventBridge allows up to 10 entries per PutEvents request
14-
const entries = [];
15-
16-
for (const record of event.Records) {
17-
try {
18-
const snsMessage = JSON.parse(record.Sns.Message);
19-
if (!validateEvent(snsMessage)) {
20-
throw new Error("Invalid event structure");
21-
}
22-
entries.push({
23-
Source: snsMessage.Source,
24-
DetailType: snsMessage.DetailType,
25-
Detail: JSON.stringify({ message: snsMessage.Message }),
26-
EventBusName: snsMessage.Type === 'control' ? controlEventBusArn : dataEventBusArn
27-
});
28-
} catch (err) {
29-
console.error("Event validation failed", err);
1+
const { EventBridgeClient, PutEventsCommand } = require('@aws-sdk/client-eventbridge');
2+
const { SQSClient, SendMessageCommand } = require('@aws-sdk/client-sqs');
3+
4+
const eventBridge = new EventBridgeClient({});
5+
const sqs = new SQSClient({});
6+
7+
const DATA_PLANE_EVENT_BUS_ARN = process.env.DATA_PLANE_EVENT_BUS_ARN;
8+
const CONTROL_PLANE_EVENT_BUS_ARN = process.env.CONTROL_PLANE_EVENT_BUS_ARN;
9+
const DLQ_URL = process.env.DLQ_URL;
10+
const THROTTLE_DELAY_MS = parseInt(process.env.THROTTLE_DELAY_MS || '0', 10);
11+
const MAX_RETRIES = 3;
12+
const EVENTBRIDGE_MAX_BATCH_SIZE = 10;
13+
14+
function validateEvent(event) {
15+
// Test Event
16+
// {
17+
// "type":"data",
18+
// "version":"0.1",
19+
// "source":"manual",
20+
// "detailtype":"testEvent",
21+
// "message":"Hello World"
22+
// }
23+
const requiredFields = ['type', 'version', 'source', 'message'];
24+
return requiredFields.every(field => event.hasOwnProperty(field));
25+
}
26+
27+
async function sendToEventBridge(events, eventBusArn) {
28+
// console.info(`Sending ${events.length} events to EventBridge: ${eventBusArn}`);
29+
30+
const failedEvents = [];
31+
for (let i = 0; i < events.length; i += EVENTBRIDGE_MAX_BATCH_SIZE) {
32+
const batch = events.slice(i, i + EVENTBRIDGE_MAX_BATCH_SIZE);
33+
const entries = batch.map(event => ({
34+
Source: 'custom.event',
35+
DetailType: event.type,
36+
Detail: JSON.stringify(event),
37+
EventBusName: eventBusArn
38+
}));
39+
40+
let attempts = 0;
41+
while (attempts < MAX_RETRIES) {
42+
try {
43+
// console.info(`Attempt ${attempts + 1}: Sending batch of ${entries.length} events.`);
44+
45+
const response = await eventBridge.send(new PutEventsCommand({ Entries: entries }));
46+
response.FailedEntryCount && response.Entries.forEach((entry, idx) => {
47+
if (entry.ErrorCode) {
48+
console.warn(`Event failed with error: ${entry.ErrorCode}`);
49+
failedEvents.push(batch[idx]);
50+
}
51+
});
52+
break;
53+
} catch (error) {
54+
console.error(`EventBridge send error: ${error}`);
55+
56+
if (error.retryable) {
57+
console.warn(`Retrying after backoff: attempt ${attempts + 1}`);
58+
await new Promise(res => setTimeout(res, 2 ** attempts * 100));
59+
attempts++;
60+
} else {
61+
failedEvents.push(...batch);
62+
break;
63+
}
64+
}
65+
}
3066
}
31-
}
32-
33-
for (let i = 0; i < entries.length; i += batchSize) {
34-
const batch = entries.slice(i, i + batchSize);
35-
const command = new PutEventsCommand({ Entries: batch });
36-
try {
37-
const response = await eventBridgeClient.send(command);
38-
console.log(`Batch sent to EventBridge. Failed count: ${response.FailedEntryCount}`);
39-
if (response.FailedEntryCount > 0) {
40-
console.warn(`Some events failed:`, response.Entries);
41-
}
42-
} catch (err) {
43-
console.error("Error sending batch to EventBridge", err);
67+
return failedEvents;
68+
}
69+
70+
async function sendToDLQ(events) {
71+
console.warn(`Sending ${events.length} failed events to DLQ`);
72+
73+
for (const event of events) {
74+
await sqs.send(new SendMessageCommand({ QueueUrl: DLQ_URL, MessageBody: JSON.stringify(event) }));
4475
}
45-
}
46-
47-
return {
48-
statusCode: 200,
49-
body: JSON.stringify({ message: "Events processed with potential failures" })
50-
};
51-
} catch (error) {
52-
console.error("Error processing events", error);
53-
return {
54-
statusCode: 500,
55-
body: JSON.stringify({ error: error.message })
56-
};
5776
}
58-
};
5977

60-
function validateEvent(event) {
61-
// My test event looks like
62-
// {
63-
// "Type":"data",
64-
// "Version":"0.1",
65-
// "Source":"manual",
66-
// "DetailType":"testEvent",
67-
// "Message":"Hello World"
68-
// }
69-
return event && event.Type && event.Source && event.DetailType && event.Message && event.Version;
70-
}
78+
exports.handler = async (snsEvent) => {
79+
// console.info(`Received SNS event with ${snsEvent.Records.length} records.`);
80+
81+
if (THROTTLE_DELAY_MS > 0) {
82+
console.info(`Throttling enabled. Delaying processing by ${THROTTLE_DELAY_MS}ms`);
83+
await new Promise(res => setTimeout(res, THROTTLE_DELAY_MS));
84+
}
85+
86+
const records = snsEvent.Records.map(record => JSON.parse(record.Sns.Message));
87+
const validEvents = records.filter(validateEvent);
88+
const invalidEvents = records.filter(event => !validateEvent(event));
89+
90+
// console.info(`Valid events: ${validEvents.length}, Invalid events: ${invalidEvents.length}`);
91+
92+
if (invalidEvents.length) await sendToDLQ(invalidEvents);
93+
94+
const dataEvents = validEvents.filter(event => event.type === 'data');
95+
const controlEvents = validEvents.filter(event => event.type === 'control');
96+
97+
// console.info(`Data events: ${dataEvents.length}, Control events: ${controlEvents.length}`);
98+
99+
const failedDataEvents = await sendToEventBridge(dataEvents, DATA_PLANE_EVENT_BUS_ARN);
100+
const failedControlEvents = await sendToEventBridge(controlEvents, CONTROL_PLANE_EVENT_BUS_ARN);
101+
102+
await sendToDLQ([...failedDataEvents, ...failedControlEvents]);
103+
};

infrastructure/modules/eventpub/lambda_function.tf

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,18 @@ resource "aws_lambda_function" "main" {
1313
source_code_hash = data.archive_file.lambda.output_base64sha256
1414

1515
logging_config {
16-
application_log_level = "INFO"
16+
application_log_level = var.log_level
1717
log_format = "JSON"
1818
log_group = aws_cloudwatch_log_group.lambda.name
19-
system_log_level = "WARN"
19+
system_log_level = var.log_level
2020
}
2121

2222
environment {
2323
variables = {
2424
DATA_PLANE_EVENT_BUS_ARN = var.data_plane_bus_arn
2525
CONTROL_PLANE_EVENT_BUS_ARN = var.control_plane_bus_arn
26+
DLQ_URL = aws_sqs_queue.dlq.url
27+
THROTTLE_DELAY_MS = "0"
2628
}
2729
}
2830
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
resource "aws_sqs_queue" "dlq" {
2+
name = "${local.csi}-dlq"
3+
4+
kms_master_key_id = var.kms_key_arn
5+
}

infrastructure/modules/eventpub/variables.tf

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,16 +83,7 @@ variable "enable_sns_delivery_logging" {
8383
default = false
8484
}
8585

86-
variable "sns_delivery_logging_bucket" {
87-
type = string
88-
description = "An S3 bucket name if event caching is enabled"
89-
default = ""
9086

91-
validation {
92-
condition = var.enable_sns_delivery_logging == false || length(var.sns_delivery_logging_bucket) > 1
93-
error_message = "If delivery logs are required, an S3 bucket name must be provided"
94-
}
95-
}
9687

9788
variable "sns_success_logging_sample_percent" {
9889
type = number
@@ -103,7 +94,7 @@ variable "sns_success_logging_sample_percent" {
10394
variable "log_level" {
10495
type = string
10596
description = "The log level to be used in lambda functions within the component. Any log with a lower severity than the configured value will not be logged: https://docs.python.org/3/library/logging.html#levels"
106-
default = "INFO"
97+
default = "WARN"
10798
}
10899

109100
variable "event_cache_expiry_days" {

0 commit comments

Comments
 (0)