Skip to content

Commit 46dd8ec

Browse files
committed
Partially implement the business logic
1 parent f0d010f commit 46dd8ec

4 files changed

Lines changed: 105 additions & 34 deletions

File tree

Lines changed: 76 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,40 @@
11
import {Logger} from "@aws-lambda-powertools/logger"
22

3-
import {PostDatedSQSMessageWithExistingRecords} from "./types"
3+
import {PSUDataItem} from "@psu-common/commonTypes"
44

5+
import {PostDatedSQSMessageWithExistingRecords, PostDatedProcessingResult} from "./types"
6+
7+
// defaults to false
58
const POST_DATED_OVERRIDE = process.env.POST_DATED_OVERRIDE === "true"
6-
const POST_DATED_OVERRIDE_VALUE = process.env.POST_DATED_OVERRIDE_VALUE === "true"
9+
10+
// set from environment variable POST_DATED_OVERRIDE_VALUE
11+
const POST_DATED_OVERRIDE_VALUE_ENV = process.env.POST_DATED_OVERRIDE_VALUE ?? "ignore"
12+
let POST_DATED_OVERRIDE_VALUE: PostDatedProcessingResult
13+
switch (POST_DATED_OVERRIDE_VALUE_ENV.toLowerCase()) {
14+
case "matured":
15+
POST_DATED_OVERRIDE_VALUE = PostDatedProcessingResult.MATURED
16+
break
17+
case "immature":
18+
POST_DATED_OVERRIDE_VALUE = PostDatedProcessingResult.IMMATURE
19+
break
20+
default:
21+
POST_DATED_OVERRIDE_VALUE = PostDatedProcessingResult.IGNORE
22+
break
23+
}
24+
25+
export function getMostRecentRecord(
26+
existingRecords: Array<PSUDataItem>
27+
): PSUDataItem {
28+
return existingRecords.reduce((latest, record) => {
29+
const latestTimestamp = latest.PostDatedLastModifiedSetAt
30+
? new Date(latest.PostDatedLastModifiedSetAt)
31+
: new Date(latest.LastModified)
32+
const recordTimestamp = record.PostDatedLastModifiedSetAt
33+
? new Date(record.PostDatedLastModifiedSetAt)
34+
: new Date(record.LastModified)
35+
return recordTimestamp > latestTimestamp ? record : latest
36+
}, existingRecords[0])
37+
}
738

839
/**
940
* Process a single post-dated prescription message.
@@ -13,15 +44,14 @@ const POST_DATED_OVERRIDE_VALUE = process.env.POST_DATED_OVERRIDE_VALUE === "tru
1344
* @param message - The SQS message containing post-dated prescription data and existing records
1445
* @returns Promise<boolean> - true if the post-dated prescription has matured, and false otherwise
1546
*/
16-
export async function processMessage(
47+
export function processMessage(
1748
logger: Logger,
1849
message: PostDatedSQSMessageWithExistingRecords
19-
): Promise<boolean> {
20-
logger.info("Processing post-dated prescription message (dummy)", {
50+
): string {
51+
logger.info("Processing post-dated prescription message", {
2152
messageId: message.MessageId,
2253
prescriptionData: message.prescriptionData,
23-
existingRecordsCount: message.existingRecords.length,
24-
existingRecordTaskIds: message.existingRecords.map((r) => r.TaskID)
54+
existingRecords: message.existingRecords
2555
})
2656
if (POST_DATED_OVERRIDE) {
2757
logger.info("Post-dated override is enabled, returning override value", {
@@ -30,24 +60,56 @@ export async function processMessage(
3060
return POST_DATED_OVERRIDE_VALUE
3161
}
3262

33-
// TODO: Implement actual business logic for post-dated prescription processing
3463
// The existingRecords array contains all records from the DynamoDB table
3564
// that match this prescription's PrescriptionID
3665

3766
// NOTE: It is technically possible for the array to be empty if no existing records are found
38-
// This SHOULD never happen in practice, but the code should handle it gracefully just in case
67+
// This SHOULD never happen in practice, but catch it anyway
68+
if (message.existingRecords.length === 0) {
69+
logger.error("No existing records found for post-dated prescription, cannot process", {
70+
badMessage: message
71+
})
72+
73+
// throw new Error("No existing records found for post-dated prescription") // maybe?
74+
return PostDatedProcessingResult.IGNORE
75+
}
76+
77+
// We only care about the most recent submission for this prescription
78+
// If PostDatedLastModifiedSetAt IS set, it is the timestamp we received the submission
79+
// If it is NOT set, then LastModified is the timestamp we received the submission
80+
const mostRecentRecord = getMostRecentRecord(message.existingRecords)
81+
82+
logger.info("Most recent NPPTS record for post-dated processing", {
83+
mostRecentRecord
84+
})
85+
86+
// Is it post-dated?
87+
if (!mostRecentRecord.PostDatedLastModifiedSetAt) {
88+
logger.info(
89+
"Most recent record is not marked as post-dated, and will have been processed " +
90+
"by the standard logic already. Marking as to be ignored by the post-dated notifications lambda."
91+
)
92+
return PostDatedProcessingResult.IGNORE
93+
}
94+
95+
// Is it still RTC?
96+
const mostRecentStatus = mostRecentRecord.Status.toLowerCase()
97+
const notifiableStatuses: Array<string> = ["ready to collect", "ready to collect - partial"]
98+
if (!notifiableStatuses.includes(mostRecentStatus)) {
99+
logger.info("Most recent status in the NPPTS data store is not a notifiable status, so will be ignored", {
100+
mostRecentStatus: mostRecentStatus
101+
})
102+
return PostDatedProcessingResult.IGNORE
103+
}
39104

40-
const mostRecentRecord = message.existingRecords.reduce((latest, record) => {
41-
return new Date(record.LastModified) > new Date(latest.LastModified) ? record : latest
42-
}, message.existingRecords[0])
43105
const mostRecentLastModified = new Date(mostRecentRecord.LastModified)
44-
const desiredTransitionTime = new Date(mostRecentRecord.PostDatedLastModifiedSetAt as string)
106+
const desiredTransitionTime = new Date(mostRecentRecord.PostDatedLastModifiedSetAt)
45107
const currentTime = new Date()
46108
logger.info("Post-dated prescription timing details", {
47109
mostRecentLastModified: mostRecentLastModified.toISOString(),
48110
desiredTransitionTime: desiredTransitionTime.toISOString(),
49111
currentTime: currentTime.toISOString()
50112
})
51113

52-
return true
114+
return PostDatedProcessingResult.MATURED
53115
}

packages/postDatedLambda/src/orchestration.ts

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import {Logger} from "@aws-lambda-powertools/logger"
33
import {processMessage} from "./businessLogic"
44
import {enrichMessagesWithExistingRecords} from "./databaseClient"
55
import {receivePostDatedSQSMessages, reportQueueStatus, handleProcessedMessages} from "./sqs"
6-
import {BatchProcessingResult, PostDatedSQSMessage} from "./types"
6+
import {BatchProcessingResult, PostDatedProcessingResult, PostDatedSQSMessage} from "./types"
77

88
export const MAX_QUEUE_RUNTIME = 14 * 60 * 1000 // 14 minutes, to avoid Lambda timeout issues (timeout is 15 minutes)
99
const MIN_RECEIVED_THRESHOLD = 3 // If fewer than this number of messages are received, consider the queue empty
@@ -21,24 +21,22 @@ export async function processMessages(
2121
messages: Array<PostDatedSQSMessage>,
2222
logger: Logger
2323
): Promise<BatchProcessingResult> {
24-
if (messages.length === 0) {
25-
logger.info("No messages to process in batch")
26-
return {maturedPrescriptionUpdates: [], immaturePrescriptionUpdates: []}
27-
}
28-
2924
// Enrich messages with existing records from DynamoDB
3025
const enrichedMessages = await enrichMessagesWithExistingRecords(messages, logger)
3126

3227
const maturedPrescriptionUpdates: Array<PostDatedSQSMessage> = []
3328
const immaturePrescriptionUpdates: Array<PostDatedSQSMessage> = []
29+
const ignoredPrescriptionUpdates: Array<PostDatedSQSMessage> = []
3430

3531
for (const message of enrichedMessages) {
3632
try {
37-
const success = await processMessage(logger, message)
38-
if (success) {
33+
const action = processMessage(logger, message)
34+
if (action === PostDatedProcessingResult.MATURED) {
3935
maturedPrescriptionUpdates.push(message)
40-
} else {
36+
} else if (action === PostDatedProcessingResult.IMMATURE) {
4137
immaturePrescriptionUpdates.push(message)
38+
} else {
39+
ignoredPrescriptionUpdates.push(message)
4240
}
4341
} catch (error) {
4442
logger.error("Error processing message", {
@@ -52,10 +50,11 @@ export async function processMessages(
5250
logger.info("Batch processing complete", {
5351
totalMessages: messages.length,
5452
maturedPrescriptionUpdatesCount: maturedPrescriptionUpdates.length,
55-
immaturePrescriptionUpdatesCount: immaturePrescriptionUpdates.length
53+
immaturePrescriptionUpdatesCount: immaturePrescriptionUpdates.length,
54+
ignoredPrescriptionUpdatesCount: ignoredPrescriptionUpdates.length
5655
})
5756

58-
return {maturedPrescriptionUpdates, immaturePrescriptionUpdates}
57+
return {maturedPrescriptionUpdates, immaturePrescriptionUpdates, ignoredPrescriptionUpdates}
5958
}
6059

6160
/**

packages/postDatedLambda/src/sqs.ts

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,8 @@ export async function sendSQSMessagesToNotificationQueue(
243243
messages: Array<PostDatedSQSMessage>
244244
): Promise<Array<string>> {
245245
if (messages.length === 0) {
246-
logger.info("No matured post-dated messages to forward to notifications queue")
246+
// exit early so we don't send a SendMessageBatch with no entries
247+
logger.info("No messages to forward to notifications queue")
247248
return []
248249
}
249250

@@ -272,6 +273,7 @@ export async function removeSQSMessages(
272273
messages: Array<Message>
273274
): Promise<void> {
274275
if (messages.length === 0) {
276+
// exit early so we don't send a DeleteMessageBatch with no entries
275277
logger.info("No messages to delete")
276278
return
277279
}
@@ -319,6 +321,7 @@ export async function returnMessagesToQueue(
319321
messages: Array<Message>
320322
): Promise<void> {
321323
if (messages.length === 0) {
324+
// exit early so we don't send a ChangeMessageVisibilityBatch with no entries
322325
logger.info("No messages to return to queue")
323326
return
324327
}
@@ -377,16 +380,15 @@ export async function handleProcessedMessages(
377380
result: BatchProcessingResult,
378381
logger: Logger
379382
): Promise<void> {
380-
const {maturedPrescriptionUpdates, immaturePrescriptionUpdates} = result
383+
const {maturedPrescriptionUpdates, immaturePrescriptionUpdates, ignoredPrescriptionUpdates} = result
381384

382385
// Move matured messages to the notification queue and remove them from the post-dated queue
383-
if (maturedPrescriptionUpdates.length > 0) {
384-
await sendSQSMessagesToNotificationQueue(logger, maturedPrescriptionUpdates)
385-
await removeSQSMessages(logger, maturedPrescriptionUpdates)
386-
}
386+
await sendSQSMessagesToNotificationQueue(logger, maturedPrescriptionUpdates)
387+
await removeSQSMessages(logger, maturedPrescriptionUpdates)
387388

388389
// Return failed messages to the queue
389-
if (immaturePrescriptionUpdates.length > 0) {
390-
await returnMessagesToQueue(logger, immaturePrescriptionUpdates)
391-
}
390+
await returnMessagesToQueue(logger, immaturePrescriptionUpdates)
391+
392+
// Remove ignored messages from the queue, so they are not reprocessed
393+
await removeSQSMessages(logger, ignoredPrescriptionUpdates)
392394
}

packages/postDatedLambda/src/types.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,18 @@ export interface PostDatedSQSMessageWithExistingRecords extends PostDatedSQSMess
2929
existingRecords: Array<PSUDataItem>
3030
}
3131

32+
// Enum of strings, "matured", "immature", "ignore"
33+
export enum PostDatedProcessingResult {
34+
MATURED = "matured",
35+
IMMATURE = "immature",
36+
IGNORE = "ignore"
37+
}
38+
3239
/**
3340
* Result of processing a batch of messages.
3441
*/
3542
export interface BatchProcessingResult {
3643
maturedPrescriptionUpdates: Array<PostDatedSQSMessage>
3744
immaturePrescriptionUpdates: Array<PostDatedSQSMessage>
45+
ignoredPrescriptionUpdates: Array<PostDatedSQSMessage>
3846
}

0 commit comments

Comments
 (0)