Skip to content

Commit 1c8462d

Browse files
CCM-10258: Discrete Control and Data SNS Topics
1 parent 8c81268 commit 1c8462d

12 files changed

Lines changed: 88 additions & 39 deletions
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
resource "aws_cloudwatch_log_group" "sns_delivery_logging_failure" {
2-
count = var.enable_sns_delivery_logging ? 1 : 0
2+
for_each = var.enable_event_cache ? local.sns_topics : {}
33

44
# SNS doesn't allow specifying a log group and is derived as: sns/${region}/${account_id}/${name_of_sns_topic}/Failure
55
# (for failure logs)
6-
name = "sns/${var.region}/${var.aws_account_id}/${local.csi}/Failure"
6+
name = "sns/${var.region}/${var.aws_account_id}/${local.csi}${contains(each.key, "data") ? "-data" : "-control"}/Failure"
77
kms_key_id = var.kms_key_arn
88
retention_in_days = var.log_retention_in_days
99
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
resource "aws_cloudwatch_log_group" "sns_delivery_logging_success" {
2-
count = var.enable_sns_delivery_logging ? 1 : 0
2+
for_each = var.enable_event_cache ? local.sns_topics : {}
33

44
# SNS doesn't allow specifying a log group and is derived as: sns/${region}/${account_id}/${name_of_sns_topic}/Failure
55
# (for failure logs)
6-
name = "sns/${var.region}/${var.aws_account_id}/${local.csi}"
6+
name = "sns/${var.region}/${var.aws_account_id}/${local.csi}${contains(each.key, "data") ? "-data" : "-control"}"
77
kms_key_id = var.kms_key_arn
88
retention_in_days = var.log_retention_in_days
99
}

infrastructure/modules/eventpub/cloudwatch_metric_alarm_sns_delivery_failures.tf

Lines changed: 0 additions & 16 deletions
This file was deleted.

infrastructure/modules/eventpub/iam_policy_sns_delivery_logging_cloudwatch.tf

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,14 @@ data "aws_iam_policy_document" "sns_delivery_logging_cloudwatch" {
3535
]
3636

3737
resources = [
38-
aws_cloudwatch_log_group.sns_delivery_logging_success[0].arn,
39-
"${aws_cloudwatch_log_group.sns_delivery_logging_success[0].arn}:log-stream:*",
40-
aws_cloudwatch_log_group.sns_delivery_logging_failure[0].arn,
41-
"${aws_cloudwatch_log_group.sns_delivery_logging_failure[0].arn}:log-stream:*",
38+
aws_cloudwatch_log_group.sns_delivery_logging_success_data[0].arn,
39+
"${aws_cloudwatch_log_group.sns_delivery_logging_success_data[0].arn}:log-stream:*",
40+
aws_cloudwatch_log_group.sns_delivery_logging_failure_data[0].arn,
41+
"${aws_cloudwatch_log_group.sns_delivery_logging_failure_data[0].arn}:log-stream:*",
42+
aws_cloudwatch_log_group.sns_delivery_logging_success_control[0].arn,
43+
"${aws_cloudwatch_log_group.sns_delivery_logging_success_control[0].arn}:log-stream:*",
44+
aws_cloudwatch_log_group.sns_delivery_logging_failure_control[0].arn,
45+
"${aws_cloudwatch_log_group.sns_delivery_logging_failure_control[0].arn}:log-stream:*",
4246
]
4347
}
4448
}

infrastructure/modules/eventpub/lambda/eventpub/src/__tests__/index.test.js

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,22 +33,34 @@ const invalidCloudEvent = {
3333
data: {}
3434
};
3535

36+
const DATA_TOPIC_ARN = 'arn:aws:sns:eu-west-2:123456789012:data-topic';
37+
const CONTROL_TOPIC_ARN = 'arn:aws:sns:eu-west-2:123456789012:control-topic';
38+
const DATA_PLANE_EVENT_BUS_ARN = 'arn:aws:events:eu-west-2:123456789012:event-bus/data';
39+
const CONTROL_PLANE_EVENT_BUS_ARN = 'arn:aws:events:eu-west-2:123456789012:event-bus/control';
40+
const DLQ_URL = 'https://sqs.eu-west-2.amazonaws.com/123456789012/dlq';
41+
3642
const snsEvent = {
3743
Records: [
38-
{ Sns: { Message: JSON.stringify(validCloudEvent) } }
44+
{ Sns: { Message: JSON.stringify(validCloudEvent), TopicArn: DATA_TOPIC_ARN } }
3945
]
4046
};
4147

4248
const snsEventInvalid = {
4349
Records: [
44-
{ Sns: { Message: JSON.stringify(invalidCloudEvent) } }
50+
{ Sns: { Message: JSON.stringify(invalidCloudEvent), TopicArn: DATA_TOPIC_ARN } }
4551
]
4652
};
4753

4854
describe('SNS to EventBridge Lambda', () => {
4955
beforeEach(() => {
5056
eventBridgeMock.reset();
5157
sqsMock.reset();
58+
process.env.DATA_TOPIC_ARN = DATA_TOPIC_ARN;
59+
process.env.CONTROL_TOPIC_ARN = CONTROL_TOPIC_ARN;
60+
process.env.DATA_PLANE_EVENT_BUS_ARN = DATA_PLANE_EVENT_BUS_ARN;
61+
process.env.CONTROL_PLANE_EVENT_BUS_ARN = CONTROL_PLANE_EVENT_BUS_ARN;
62+
process.env.DLQ_URL = DLQ_URL;
63+
process.env.THROTTLE_DELAY_MS = '0';
5264
});
5365

5466
test('Valid event is sent to the correct EventBridge bus', async () => {
@@ -57,6 +69,9 @@ describe('SNS to EventBridge Lambda', () => {
5769
await handler(snsEvent);
5870

5971
expect(eventBridgeMock.calls()).toHaveLength(1);
72+
// Check correct bus
73+
const callInput = eventBridgeMock.calls()[0].args[0].input;
74+
expect(callInput.Entries[0].EventBusName).toBe(DATA_PLANE_EVENT_BUS_ARN);
6075
});
6176

6277
test('Invalid event is sent to DLQ', async () => {
@@ -65,9 +80,10 @@ describe('SNS to EventBridge Lambda', () => {
6580
await handler(snsEventInvalid);
6681

6782
expect(sqsMock.calls()).toHaveLength(1);
83+
const callInput = sqsMock.calls()[0].args[0].input;
84+
expect(callInput.QueueUrl).toBe(DLQ_URL);
6885
});
6986

70-
7187
test('Retries on EventBridge failure and sends failed events to DLQ', async () => {
7288
eventBridgeMock
7389
.on(PutEventsCommand)
@@ -85,10 +101,32 @@ describe('SNS to EventBridge Lambda', () => {
85101
process.env.THROTTLE_DELAY_MS = '500';
86102
jest.useFakeTimers();
87103

104+
eventBridgeMock.on(PutEventsCommand).resolves({ FailedEntryCount: 0, Entries: [{}] });
105+
88106
const handlerPromise = handler(snsEvent);
107+
expect(setTimeout).toHaveBeenCalledWith(expect.any(Function), 500);
89108
jest.advanceTimersByTime(500);
90109
await handlerPromise;
91110

92111
jest.useRealTimers();
93112
});
113+
114+
test('Routes control events to control event bus', async () => {
115+
const controlEvent = {
116+
...validCloudEvent,
117+
type: "control"
118+
};
119+
const snsEventControl = {
120+
Records: [
121+
{ Sns: { Message: JSON.stringify(controlEvent), TopicArn: CONTROL_TOPIC_ARN } }
122+
]
123+
};
124+
eventBridgeMock.on(PutEventsCommand).resolves({ FailedEntryCount: 0, Entries: [{}] });
125+
126+
await handler(snsEventControl);
127+
128+
expect(eventBridgeMock.calls()).toHaveLength(1);
129+
const callInput = eventBridgeMock.calls()[0].args[0].input;
130+
expect(callInput.Entries[0].EventBusName).toBe(CONTROL_PLANE_EVENT_BUS_ARN);
131+
});
94132
});

infrastructure/modules/eventpub/lambda/eventpub/src/index.js

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ const THROTTLE_DELAY_MS = parseInt(process.env.THROTTLE_DELAY_MS || '0', 10);
1111
const MAX_RETRIES = 3;
1212
const EVENTBRIDGE_MAX_BATCH_SIZE = 10;
1313

14+
const DATA_TOPIC_ARN = process.env.DATA_TOPIC_ARN;
15+
const CONTROL_TOPIC_ARN = process.env.CONTROL_TOPIC_ARN;
16+
1417
function validateEvent(event) {
1518
// CloudEvents v1.0 schema validation (supplier-status)
1619
const requiredFields = [
@@ -107,16 +110,26 @@ exports.handler = async (snsEvent) => {
107110
await new Promise(res => setTimeout(res, THROTTLE_DELAY_MS));
108111
}
109112

110-
const records = snsEvent.Records.map(record => JSON.parse(record.Sns.Message));
111-
const validEvents = records.filter(validateEvent);
112-
const invalidEvents = records.filter(event => !validateEvent(event));
113+
// Map each record to its event and topicArn
114+
const parsedRecords = snsEvent.Records.map(record => ({
115+
event: JSON.parse(record.Sns.Message),
116+
topicArn: record.Sns.TopicArn
117+
}));
118+
119+
const validRecords = parsedRecords.filter(({ event }) => validateEvent(event));
120+
const invalidEvents = parsedRecords.filter(({ event }) => !validateEvent(event)).map(({ event }) => event);
113121

114122
// console.info(`Valid events: ${validEvents.length}, Invalid events: ${invalidEvents.length}`);
115123

116124
if (invalidEvents.length) await sendToDLQ(invalidEvents);
117125

118-
const dataEvents = validEvents.filter(event => event.type === 'data');
119-
const controlEvents = validEvents.filter(event => event.type === 'control');
126+
// Classify by topicArn
127+
const dataEvents = validRecords
128+
.filter(({ topicArn }) => topicArn === DATA_TOPIC_ARN)
129+
.map(({ event }) => event);
130+
const controlEvents = validRecords
131+
.filter(({ topicArn }) => topicArn === CONTROL_TOPIC_ARN)
132+
.map(({ event }) => event);
120133

121134
// console.info(`Data events: ${dataEvents.length}, Control events: ${controlEvents.length}`);
122135

infrastructure/modules/eventpub/lambda_function.tf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ resource "aws_lambda_function" "main" {
2121

2222
environment {
2323
variables = {
24+
DATA_TOPIC_ARN = aws_sns_topic.data.arn
25+
CONTROL_TOPIC_ARN = aws_sns_topic.control.arn
2426
DATA_PLANE_EVENT_BUS_ARN = var.data_plane_bus_arn
2527
CONTROL_PLANE_EVENT_BUS_ARN = var.control_plane_bus_arn
2628
DLQ_URL = aws_sqs_queue.dlq.url

infrastructure/modules/eventpub/lambda_permissions_sns_event_cache.tf renamed to infrastructure/modules/eventpub/lambda_permissions_sns_lambda.tf

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
resource "aws_lambda_permission" "sns_lambda" {
2-
statement_id = "AllowExecutionFromSNS"
2+
for_each = local.sns_topics
3+
4+
statement_id = "AllowExecutionFromSNS${title(each.key)}Topic"
35
action = "lambda:InvokeFunction"
46
function_name = aws_lambda_function.main.function_name
57
principal = "sns.amazonaws.com"
6-
source_arn = aws_sns_topic.main.arn
8+
source_arn = aws_sns_topic[each.key].arn
79
}

infrastructure/modules/eventpub/locals.tf

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,8 @@ locals {
1919
Name = local.csi
2020
},
2121
)
22-
22+
sns_topics = {
23+
data = "${local.csi}-data"
24+
control = "${local.csi}-control"
25+
}
2326
}

infrastructure/modules/eventpub/sns_topic.tf renamed to infrastructure/modules/eventpub/sns_topic_main.tf

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
resource "aws_sns_topic" "main" {
2-
name = local.csi
2+
for_each = local.sns_topics
3+
name = each.value
34
kms_master_key_id = var.kms_key_arn
45

56
application_failure_feedback_role_arn = var.enable_sns_delivery_logging == true ? aws_iam_role.sns_delivery_logging_role[0].arn : null

0 commit comments

Comments
 (0)