Skip to content

Commit 939e2e7

Browse files
committed
Review yesterdays work. Leave some todo notes for the business logic
1 parent 7083f88 commit 939e2e7

5 files changed

Lines changed: 53 additions & 40 deletions

File tree

packages/postDatedLambda/src/businessLogic.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import {PostDatedSQSMessageWithExistingRecords} from "./types"
88
*
99
* @param logger - The AWS Lambda Powertools logger instance
1010
* @param message - The SQS message containing post-dated prescription data and existing records
11-
* @returns Promise<boolean> - true if processing succeeded, false if it failed
11+
* @returns Promise<boolean> - true if the post-dated prescription has matured, and false otherwise
1212
*/
1313
export async function processMessage(
1414
logger: Logger,

packages/postDatedLambda/src/databaseClient.ts

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ function createPrescriptionLookupKey(prescriptionID: string, pharmacyODSCode: st
1919

2020
/**
2121
* Query the PrescriptionStatusUpdates table for all records matching a given prescription ID and ODS code.
22+
* There should always be at least one result, but there may be multiple if the prescription has been
23+
* updated multiple times.
2224
*
2325
* @param prescriptionID - The prescription ID to query for
2426
* @param pharmacyODSCode - The pharmacy ODS code to query for
2527
* @param logger - The AWS Lambda Powertools logger instance
26-
* @returns Array of PSUDataItem records matching the prescription ID
28+
* @returns Array of PSUDataItem records matching the prescription ID. Sorted by LastModified descending.
2729
*/
2830
export async function getExistingRecordsByPrescriptionID(
2931
prescriptionID: string,
@@ -76,8 +78,8 @@ export async function getExistingRecordsByPrescriptionID(
7678
recordCount: items.length
7779
})
7880

79-
// Sort by LastModified ascending so most recent is last
80-
items.sort((a, b) => new Date(a.LastModified).valueOf() - new Date(b.LastModified).valueOf())
81+
// Sort by LastModified ascending so most recent is first
82+
items.sort((a, b) => new Date(b.LastModified).valueOf() - new Date(a.LastModified).valueOf())
8183

8284
return items
8385
} catch (err) {
@@ -96,7 +98,8 @@ export async function getExistingRecordsByPrescriptionID(
9698
*
9799
* @param postDatedItems - Array of post-dated prescription data items
98100
* @param logger - The AWS Lambda Powertools logger instance
99-
* @returns Array of objects containing both the post-dated data and existing records
101+
* @returns Array of objects containing both the post-dated data and existing records.
102+
* Existing records are sorted by LastModified descending.
100103
*/
101104
export async function fetchExistingRecordsForPrescriptions(
102105
postDatedItems: Array<PostDatedNotifyDataItem>,
@@ -106,6 +109,8 @@ export async function fetchExistingRecordsForPrescriptions(
106109
prescriptionCount: postDatedItems.length
107110
})
108111

112+
// The data table is indexed by both PrescriptionID and PharmacyODSCode, so build a map keyed by these
113+
// in combination. Should avoid duplicate queries this way.
109114
const uniquePrescriptionLookups = new Map<string, {prescriptionID: string; pharmacyODSCode: string}>()
110115
for (const item of postDatedItems) {
111116
const lookupKey = createPrescriptionLookupKey(item.PrescriptionID, item.PharmacyODSCode)
@@ -117,10 +122,7 @@ export async function fetchExistingRecordsForPrescriptions(
117122
}
118123
}
119124

120-
// Create a map of prescription ID to existing records
121125
const existingRecordsMap = new Map<string, Array<PSUDataItem>>()
122-
123-
// Fetch existing records for each unique prescription ID
124126
await Promise.all(
125127
Array.from(uniquePrescriptionLookups.entries()).map(async ([lookupKey, {prescriptionID, pharmacyODSCode}]) => {
126128
try {
@@ -133,6 +135,7 @@ export async function fetchExistingRecordsForPrescriptions(
133135
error
134136
})
135137
// Store empty array on error to allow processing to continue
138+
// TODO: Make sure later, that if existingRecords is empty, we handle that
136139
existingRecordsMap.set(lookupKey, [])
137140
}
138141
})
@@ -183,10 +186,15 @@ export async function enrichMessagesWithExistingRecords(
183186
existingRecords: recordsMap.get(message.prescriptionData.PrescriptionID) ?? []
184187
}))
185188

186-
logger.info("Enriched messages with existing records from DynamoDB", {
187-
messageCount: messages.length,
188-
messagesWithRecords: enrichedMessages.filter((m) => m.existingRecords.length > 0).length
189-
})
189+
for (const msg of enrichedMessages) {
190+
logger.info("Prescription and most recent existing record", {
191+
prescriptionID: msg.prescriptionData.PrescriptionID,
192+
existingRecordCount: msg.existingRecords.length,
193+
mostRecentExistingRecord: msg.existingRecords.length > 0
194+
? msg.existingRecords[0]
195+
: null
196+
})
197+
}
190198

191199
return enrichedMessages
192200
}

packages/postDatedLambda/src/orchestration.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,39 +23,39 @@ export async function processMessages(
2323
): Promise<BatchProcessingResult> {
2424
if (messages.length === 0) {
2525
logger.info("No messages to process in batch")
26-
return {successful: [], failed: []}
26+
return {maturedPrescriptionUpdates: [], immaturePrescriptionUpdates: []}
2727
}
2828

2929
// Enrich messages with existing records from DynamoDB
3030
const enrichedMessages = await enrichMessagesWithExistingRecords(messages, logger)
3131

32-
const successful: Array<PostDatedSQSMessage> = []
33-
const failed: Array<PostDatedSQSMessage> = []
32+
const maturedPrescriptionUpdates: Array<PostDatedSQSMessage> = []
33+
const immaturePrescriptionUpdates: Array<PostDatedSQSMessage> = []
3434

3535
for (const message of enrichedMessages) {
3636
try {
3737
const success = await processMessage(logger, message)
3838
if (success) {
39-
successful.push(message)
39+
maturedPrescriptionUpdates.push(message)
4040
} else {
41-
failed.push(message)
41+
immaturePrescriptionUpdates.push(message)
4242
}
4343
} catch (error) {
4444
logger.error("Error processing message", {
4545
messageId: message.MessageId,
4646
error
4747
})
48-
failed.push(message)
48+
immaturePrescriptionUpdates.push(message)
4949
}
5050
}
5151

5252
logger.info("Batch processing complete", {
5353
totalMessages: messages.length,
54-
successfulCount: successful.length,
55-
failedCount: failed.length
54+
maturedPrescriptionUpdatesCount: maturedPrescriptionUpdates.length,
55+
immaturePrescriptionUpdatesCount: immaturePrescriptionUpdates.length
5656
})
5757

58-
return {successful, failed}
58+
return {maturedPrescriptionUpdates, immaturePrescriptionUpdates}
5959
}
6060

6161
/**

packages/postDatedLambda/src/sqs.ts

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import {PostDatedNotifyDataItem} from "@psu-common/commonTypes"
1313
import {BatchProcessingResult, PostDatedSQSMessage, ReceivedPostDatedSQSResult} from "./types"
1414

1515
const sqs = new SQSClient({region: process.env.AWS_REGION})
16-
const FAILED_MESSAGE_VISIBILITY_TIMEOUT = 300 // 5 minutes in seconds
1716

1817
/**
1918
* Get the SQS queue URL from environment variables.
@@ -210,34 +209,39 @@ export async function removeSQSMessages(
210209
}
211210

212211
/**
213-
* Return failed messages to the queue with a visibility timeout.
212+
* Edit failed that are on the queue to update their visibility timeout.
214213
* This makes the messages invisible for the specified duration before they can be processed again.
214+
* This does not delete the messages, or post new ones; it only alters their visibility.
215215
*
216216
* @param logger - The logging object
217217
* @param messages - The messages that failed processing and should be returned to the queue
218-
* @param visibilityTimeoutSeconds - The time in seconds before messages become visible again (default: 300 = 5 minutes)
219218
*/
220219
export async function returnMessagesToQueue(
221220
logger: Logger,
222-
messages: Array<Message>,
223-
visibilityTimeoutSeconds = 300
221+
messages: Array<Message>
224222
): Promise<void> {
225223
if (messages.length === 0) {
226-
logger.info("No failed messages to return to queue")
224+
logger.info("No messages to return to queue")
227225
return
228226
}
229227

230228
const sqsUrl = getQueueUrl(logger)
231229

230+
// TODO: Each message needs to have an appropriate visibility timeout based on when it is due to be retried.
231+
// For now, use a fixed 5 minute timeout for all messages.
232+
const visibilityTimeoutSeconds = 300
232233
const entries = messages.map((m) => ({
233234
Id: m.MessageId!,
234235
ReceiptHandle: m.ReceiptHandle!,
235236
VisibilityTimeout: visibilityTimeoutSeconds
236237
}))
237238

238239
logger.info(
239-
`Returning messages to queue with ${visibilityTimeoutSeconds}s timeout`,
240-
{numberOfMessages: entries.length, messageIds: entries.map((e) => e.Id)}
240+
`Returning messages to queue with timeouts`,
241+
{
242+
numberOfMessages: entries.length,
243+
idAndTimeouts: entries.map((e) => ({id: e.Id, visibilityTimeout: e.VisibilityTimeout}))
244+
}
241245
)
242246

243247
const changeVisibilityCmd = new ChangeMessageVisibilityBatchCommand({
@@ -258,8 +262,8 @@ export async function returnMessagesToQueue(
258262

259263
/**
260264
* Handle the results of message processing:
261-
* - Delete successful messages from the queue
262-
* - Return failed messages to the queue with a visibility timeout
265+
* - Delete matured messages from the queue
266+
* - Return immature messages to the queue with a visibility timeout
263267
* Does not alter the input result object, only performs side effects.
264268
*
265269
* @param result - The batch processing result
@@ -269,15 +273,16 @@ export async function handleProcessedMessages(
269273
result: BatchProcessingResult,
270274
logger: Logger
271275
): Promise<void> {
272-
const {successful, failed} = result
276+
const {maturedPrescriptionUpdates, immaturePrescriptionUpdates} = result
273277

274-
// Delete successful messages
275-
if (successful.length > 0) {
276-
await removeSQSMessages(logger, successful)
278+
// Delete matured messages
279+
if (maturedPrescriptionUpdates.length > 0) {
280+
// TODO: Also need to send messages to the notification queue here (do that first, then delete)
281+
await removeSQSMessages(logger, maturedPrescriptionUpdates)
277282
}
278283

279-
// Return failed messages to the queue with a 5 minute timeout
280-
if (failed.length > 0) {
281-
await returnMessagesToQueue(logger, failed, FAILED_MESSAGE_VISIBILITY_TIMEOUT)
284+
// Return failed messages to the queue
285+
if (immaturePrescriptionUpdates.length > 0) {
286+
await returnMessagesToQueue(logger, immaturePrescriptionUpdates)
282287
}
283288
}

packages/postDatedLambda/src/types.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ export interface PostDatedSQSMessageWithExistingRecords extends PostDatedSQSMess
3333
* Result of processing a batch of messages.
3434
*/
3535
export interface BatchProcessingResult {
36-
successful: Array<PostDatedSQSMessage>
37-
failed: Array<PostDatedSQSMessage>
36+
maturedPrescriptionUpdates: Array<PostDatedSQSMessage>
37+
immaturePrescriptionUpdates: Array<PostDatedSQSMessage>
3838
}
3939

4040
/**

0 commit comments

Comments
 (0)