-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathnhsNotifyLambda.ts
More file actions
132 lines (114 loc) · 3.8 KB
/
nhsNotifyLambda.ts
File metadata and controls
132 lines (114 loc) · 3.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import {EventBridgeEvent} from "aws-lambda"
import {Logger} from "@aws-lambda-powertools/logger"
import {injectLambdaContext} from "@aws-lambda-powertools/logger/middleware"
import middy from "@middy/core"
import inputOutputLogger from "@middy/input-output-logger"
import errorHandler from "@nhs/fhir-middy-error-handler"
import {getParameter} from "@aws-lambda-powertools/parameters/ssm"
import {
addPrescriptionMessagesToNotificationStateStore,
checkCooldownForUpdate,
removeSQSMessages,
reportQueueStatus,
drainQueue,
handleNotifyRequests,
NotifyDataItemMessage
} from "./utils"
const logger = new Logger({serviceName: "nhsNotify"})
const MAX_QUEUE_RUNTIME = 14 * 60 * 1000 // 14 minutes, to avoid Lambda timeout issues (timeout is 15 minutes)
/**
* Process a single batch of SQS messages: filter, notify, persist, and clean up.
*/
async function processBatch(
messages: Array<NotifyDataItemMessage>,
routingId: string
): Promise<void> {
if (messages.length === 0) {
logger.info("No messages to process")
return
}
// Filter by cooldown
const checks = await Promise.all(
messages.map(async (msg) => ({
msg,
allowed: await checkCooldownForUpdate(logger, msg.PSUDataItem)
}))
)
const toProcess = checks.filter(c => c.allowed).map(c => c.msg)
const suppressed = checks.filter(c => !c.allowed).map(c => c.msg)
logSuppression(suppressed.length, messages.length)
if (suppressed.length) {
await removeSQSMessages(logger, suppressed)
}
// Send notifications
let processed: Array<NotifyDataItemMessage> = []
try {
processed = await handleNotifyRequests(logger, routingId, toProcess)
} catch (err) {
logger.error("Notification request failed, will retry", {error: err, toProcess})
}
if (processed.length) {
await Promise.all([
addPrescriptionMessagesToNotificationStateStore(logger, processed),
removeSQSMessages(logger, processed)
])
}
}
/**
* Log suppression details (sonar complained of high code complexity)
*/
function logSuppression(suppressedCount: number, total: number): void {
if (suppressedCount === total) {
logger.info("All messages suppressed by cooldown; nothing to notify", {
suppressedCount,
totalFetched: total
})
} else if (suppressedCount > 0) {
logger.info(`Suppressed ${suppressedCount} messages due to cooldown`, {
suppressedCount,
totalFetched: total
})
}
}
/**
* Drain the queue until empty or the MAX_QUEUE_RUNTIME has passed, processing each batch.
*/
async function drainAndProcess(routingId: string): Promise<void> {
const start = Date.now()
let empty = false
while (!empty) {
if (Date.now() - start >= MAX_QUEUE_RUNTIME) {
logger.warn("drainAndProcess timed out; exiting before queue is empty",
{maxRuntimeMilliseconds: MAX_QUEUE_RUNTIME}
)
break
}
const {messages, isEmpty} = await drainQueue(logger, 100)
empty = isEmpty
await processBatch(messages, routingId)
}
}
/**
* Handler for the scheduled EventBridge trigger.
*/
export const lambdaHandler = async (
event: EventBridgeEvent<string, string>
): Promise<void> => {
if (!process.env.NHS_NOTIFY_ROUTING_ID_PARAM) {
throw new Error("Environment not configured")
}
const routingId = await getParameter(process.env.NHS_NOTIFY_ROUTING_ID_PARAM)
if (!routingId) {
throw new Error("No Routing Plan ID found")
}
logger.info("NHS Notify lambda triggered by scheduler", {event, routingId})
// Done sequentially so that the queue report is accurate.
await reportQueueStatus(logger)
await drainAndProcess(routingId)
}
export const handler = middy(lambdaHandler)
.use(injectLambdaContext(logger, {clearState: true}))
.use(
inputOutputLogger({logger: (request) => logger.info("inputOutputLogger request", {request})})
)
.use(errorHandler({logger}))