@@ -37,6 +37,51 @@ function chunkArray<T>(arr: Array<T>, size: number): Array<Array<T>> {
3737 return chunks
3838}
3939
40+ type DeduplicationSource = Pick < PSUDataItem , "PatientNHSNumber" | "PharmacyODSCode" >
41+ type NotificationMessage = NotifyDataItem | PostDatedNotifyDataItem
42+
43+ function buildSqsBatchEntries < T extends DeduplicationSource > (
44+ items : Array < T > ,
45+ requestId : string ,
46+ sqsSalt : string ,
47+ toMessageBody : ( item : T ) => NotificationMessage
48+ ) : Array < SQSBatchMessage > {
49+ return items . map ( ( item , idx ) => ( {
50+ Id : idx . toString ( ) ,
51+ MessageBody : JSON . stringify ( toMessageBody ( item ) ) ,
52+ MessageDeduplicationId : saltedHash ( `${ item . PatientNHSNumber } :${ item . PharmacyODSCode } ` , sqsSalt ) ,
53+ MessageGroupId : requestId ,
54+ MessageAttributes : {
55+ RequestId : {
56+ DataType : "String" ,
57+ StringValue : requestId
58+ }
59+ }
60+ } ) )
61+ }
62+
63+ /**
64+ * Sends entries to the SQS queue in batches of 10
65+ * @param entries
66+ * @param queueUrl
67+ * @param requestId
68+ * @param logger
69+ * @returns An array of the created MessageIds
70+ */
71+ async function sendEntriesToQueue (
72+ entries : Array < SQSBatchMessage > ,
73+ queueUrl : string ,
74+ requestId : string ,
75+ logger : Logger
76+ ) : Promise < Array < string > > {
77+ if ( ! entries . length ) {
78+ return [ ]
79+ }
80+
81+ const batches = chunkArray ( entries , 10 )
82+ return placeBatchInSQS ( batches , queueUrl , requestId , logger )
83+ }
84+
4085/**
4186 * Salts and hashes a string.
4287 *
@@ -60,10 +105,14 @@ export function saltedHash(
60105export async function getSaltValue ( logger : Logger ) : Promise < string > {
61106 let sqsSalt : string
62107
63- if ( process . env . SQS_SALT ) {
108+ if ( ! process . env . SQS_SALT ) {
109+ // No secret name configured at all, so fall back
110+ sqsSalt = fallbackSalt
111+ } else {
64112 try {
65113 // grab the secret, expecting JSON like { "salt": "string" }
66114 const secretJson = await getSecret ( process . env . SQS_SALT , { transform : "json" } )
115+ logger . info ( "Fetched SQS_SALT from Secrets Manager" , { secretJson} )
67116
68117 // must be a non‐null object with a string .salt
69118 if (
@@ -84,9 +133,6 @@ export async function getSaltValue(logger: Logger): Promise<string> {
84133 logger . error ( "Failed to fetch SQS_SALT from Secrets Manager, using DEV SALT" , { error} )
85134 sqsSalt = fallbackSalt
86135 }
87- } else {
88- // No secret name configured at all, so fall back
89- sqsSalt = fallbackSalt
90136 }
91137
92138 if ( sqsSalt === fallbackSalt ) {
@@ -98,12 +144,20 @@ export async function getSaltValue(logger: Logger): Promise<string> {
98144 return sqsSalt
99145}
100146
147+ /**
148+ * Places batches of messages into SQS
149+ * @param batches
150+ * @param sqsUrl
151+ * @param requestId
152+ * @param logger
153+ * @returns An array of the send MessageIds
154+ */
101155async function placeBatchInSQS (
102156 batches : Array < Array < SQSBatchMessage > > ,
103157 sqsUrl : string ,
104158 requestId : string ,
105159 logger : Logger
106- ) {
160+ ) : Promise < Array < string > > {
107161
108162 // Used for the return value
109163 let out : Array < string > = [ ]
@@ -126,18 +180,18 @@ async function placeBatchInSQS(
126180 } )
127181 const result = await sqs . send ( command )
128182 if ( result . Successful ?. length ) {
129- logger . info ( "Successfully sent a batch of prescriptions to the notifications SQS" , { result} )
183+ logger . info ( "Successfully sent a batch of prescriptions to the SQS" , { result, sqsUrl } )
130184
131185 // For each successful message, get its message ID. I don't think there will ever be undefined
132186 // actually in here, but the typing suggests that there could be so filter those out
133187 out . push ( ...result . Successful . map ( e => e . MessageId ) . filter ( msg_id => msg_id !== undefined ) )
134188 }
135189 // Some may succeed, and some may fail. So check for both
136190 if ( result . Failed ?. length ) {
137- throw new Error ( " Failed to send a batch of prescriptions to the notifications SQS" )
191+ throw new Error ( ` Failed to send a batch of prescriptions to the SQS {sqsUrl}` )
138192 }
139193 } catch ( error ) {
140- logger . error ( "Failed to send a batch of prescriptions to the notifications SQS" , { error} )
194+ logger . error ( "Failed to send a batch of prescriptions to the SQS" , { error, sqsUrl } )
141195 throw error
142196 }
143197 }
@@ -152,6 +206,8 @@ function norm(str: string) {
152206/**
153207 * Pushes an array of PSUDataItem to the notifications SQS queue
154208 * Uses SendMessageBatch to send up to 10 at a time
209+ * Contains the logic for filtering which items should be sent, based on
210+ * which sites/systems are enabled, and which statuses are to be sent
155211 *
156212 * @param requestId - The x-request-id header from the incoming event
157213 * @param data - Array of PSUDataItem to send to SQS
@@ -184,8 +240,6 @@ export async function pushPrescriptionToNotificationSQS(
184240 norm ( "ready to collect" ) ,
185241 norm ( "ready to collect - partial" )
186242 ] )
187- // Salt for the deduplication hash
188- const sqsSalt = await getSaltValue ( logger )
189243
190244 // Get only items which have the correct current statuses
191245 const candidates = allowedSitesAndSystemsData . filter (
@@ -205,89 +259,47 @@ export async function pushPrescriptionToNotificationSQS(
205259 const postDatedItems = changedStatus . filter ( item => item . PostDatedLastModifiedSetAt )
206260 const nonPostDatedItems = changedStatus . filter ( item => ! item . PostDatedLastModifiedSetAt )
207261
208- sendPostDatedItemsToSQS ( postDatedItems , requestId , logger )
209-
210- // Build SQS batch entries with FIFO parameters
211- const allEntries : Array < SQSBatchMessage > = nonPostDatedItems
212- . map ( ( item , idx ) => ( {
213- Id : idx . toString ( ) ,
214- // Only post the required information to SQS
215- MessageBody : JSON . stringify ( item as NotifyDataItem ) ,
216- // FIFO
217- // We dedupe on both nhs number and ods code
218- MessageDeduplicationId : saltedHash ( `${ item . PatientNHSNumber } :${ item . PharmacyODSCode } ` , sqsSalt ) ,
219- MessageGroupId : requestId ,
220- MessageAttributes : {
221- RequestId : {
222- DataType : "String" ,
223- StringValue : requestId
224- }
225- }
226- } ) )
227-
228- if ( ! allEntries . length ) {
229- // Carry on if we have no updates to make.
230- logger . info ( "No entries to post to the notifications SQS" )
231- return [ ]
232- }
262+ const postDatedMessageIds = sendItemsToSQS ( postDatedItems , postDatedSqsUrl , requestId , logger )
263+ const nonPostDatedMessageIds = sendItemsToSQS ( nonPostDatedItems , sqsUrl , requestId , logger )
233264
234265 logger . info (
235266 "The following patients will have prescription update app notifications requested" ,
236- { nhsNumbers : allowedSitesAndSystemsData . map ( e => e . current . PatientNHSNumber ) }
267+ { nhsNumbers : changedStatus . map ( e => e . PatientNHSNumber ) }
237268 )
238269
239- // Remove post-dated entries from the normal flow
240- const currentlyValidEntries = allEntries . filter ( entry => {
241- const body : NotifyDataItem = JSON . parse ( entry . MessageBody )
242- return ! ( "PostDatedLastModifiedSetAt" in body )
243- } )
244-
245- // SQS batch calls are limited to 10 messages per request, so chunk the data
246- const batches = chunkArray ( currentlyValidEntries , 10 )
247- const out = await placeBatchInSQS ( batches , sqsUrl , requestId , logger )
248-
249- return out
270+ return Promise . all ( [ postDatedMessageIds , nonPostDatedMessageIds ] )
271+ . then ( results => results . flat ( ) )
250272}
251273
252- // FIXME: Remove this function once post-dated updates are deprecated
253- async function sendPostDatedItemsToSQS (
254- postDatedItems : Array < PSUDataItem > ,
274+ /**
275+ *
276+ * @param items
277+ * @param sqsUrl
278+ * @param requestId
279+ * @param logger
280+ * @returns an array of the sent MessageIDs
281+ */
282+ async function sendItemsToSQS (
283+ items : Array < PSUDataItem > ,
284+ sqsUrl : string ,
255285 requestId : string ,
256286 logger : Logger
257- ) : Promise < void > {
258- if ( postDatedItems . length === 0 ) {
259- logger . info ( "No post-dated items to send to SQS" )
260- return
261- }
262-
263- if ( ! postDatedSqsUrl ) {
264- logger . error ( "Post-dated Notifications SQS URL not found in environment variables" )
265- throw new Error ( "Post-dated Notifications SQS URL not configured" )
287+ ) : Promise < Array < string > > {
288+ if ( items . length === 0 ) {
289+ logger . info ( "No items to send to SQS" , { sqsUrl} )
290+ return [ ]
266291 }
267292
268- logger . info ( `Placing ${ postDatedItems . length } post-dated entries into the post-dated SQS queue` )
293+ logger . info ( `Placing ${ items . length } entries into the SQS queue` , { sqsUrl } )
269294
270295 const sqsSalt = await getSaltValue ( logger )
271296
272- // This time, instead of posting NotifyDataItem, we use PostDatedNotifyDataItem
273- const allEntries : Array < SQSBatchMessage > = postDatedItems
274- . map ( ( item , idx ) => ( {
275- Id : idx . toString ( ) ,
276- // Only post the required information to SQS
277- MessageBody : JSON . stringify ( item as PostDatedNotifyDataItem ) ,
278- // FIFO
279- // We dedupe on both nhs number and ods code
280- MessageDeduplicationId : saltedHash ( `${ item . PatientNHSNumber } :${ item . PharmacyODSCode } ` , sqsSalt ) ,
281- MessageGroupId : requestId ,
282- MessageAttributes : {
283- RequestId : {
284- DataType : "String" ,
285- StringValue : requestId
286- }
287- }
288- } ) )
297+ const entries = buildSqsBatchEntries (
298+ items ,
299+ requestId ,
300+ sqsSalt ,
301+ item => item as PostDatedNotifyDataItem
302+ )
289303
290- // SQS batch calls are limited to 10 messages per request, so chunk the data
291- const batches = chunkArray ( allEntries , 10 )
292- await placeBatchInSQS ( batches , postDatedSqsUrl , requestId , logger )
304+ return await sendEntriesToQueue ( entries , sqsUrl , requestId , logger )
293305}
0 commit comments