-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathdatabaseClient.ts
More file actions
111 lines (103 loc) · 3.63 KB
/
databaseClient.ts
File metadata and controls
111 lines (103 loc) · 3.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import {Logger} from "@aws-lambda-powertools/logger"
import {
DynamoDBClient,
GetItemCommand,
GetItemCommandInput,
QueryCommand,
QueryCommandInput,
TransactionCanceledException,
TransactWriteItem,
TransactWriteItemsCommand
} from "@aws-sdk/client-dynamodb"
import {marshall, unmarshall} from "@aws-sdk/util-dynamodb"
import {PSUDataItem} from "@PrescriptionStatusUpdate_common/commonTypes"
import {Timeout} from "./timeoutUtils"
const client = new DynamoDBClient()
const tableName = process.env.TABLE_NAME ?? "PrescriptionStatusUpdates"
function createTransactionCommand(dataItems: Array<PSUDataItem>, logger: Logger): TransactWriteItemsCommand {
logger.info("Creating transaction command to write data items.")
const transactItems: Array<TransactWriteItem> = dataItems.map((d: PSUDataItem): TransactWriteItem => {
return {
Put: {
TableName: tableName,
Item: marshall(d),
ConditionExpression: "attribute_not_exists(TaskID) AND attribute_not_exists(PrescriptionID)",
ReturnValuesOnConditionCheckFailure: "ALL_OLD"
}
}
})
return new TransactWriteItemsCommand({TransactItems: transactItems})
}
export async function persistDataItems(dataItems: Array<PSUDataItem>, logger: Logger): Promise<boolean | Timeout> {
const transactionCommand = createTransactionCommand(dataItems, logger)
try {
logger.info("Sending TransactWriteItemsCommand to DynamoDB.", {command: transactionCommand})
await client.send(transactionCommand)
logger.info("TransactWriteItemsCommand sent to DynamoDB successfully.", {command: transactionCommand})
return true
} catch (e) {
if (e instanceof TransactionCanceledException) {
logger.error("DynamoDB transaction cancelled due to conditional check failure.", {reasons: e.CancellationReasons})
throw e
}
logger.error("Error sending TransactWriteItemsCommand to DynamoDB.", {error: e})
return false
}
}
export async function checkPrescriptionRecordExistence(
prescriptionID: string,
taskID: string,
logger: Logger
): Promise<boolean> {
logger.info("Checking if prescription record exists in DynamoDB.", {prescriptionID}, {taskID})
const query: GetItemCommandInput = {
TableName: tableName,
Key: {
PrescriptionID: {S: prescriptionID},
TaskID: {S: taskID}
}
}
try {
const result = await client.send(new GetItemCommand(query))
logger.info("Query successful.", {result})
return !!result?.Item
} catch (e) {
logger.error("Error querying DynamoDB.", {error: e})
return false
}
}
export async function getPreviousItem(currentItem: PSUDataItem): Promise<PSUDataItem | undefined> {
const query: QueryCommandInput = {
TableName: tableName,
KeyConditions: {
PrescriptionID: {
ComparisonOperator: "EQ",
AttributeValueList: [marshall(currentItem.PrescriptionID)]
}
},
QueryFilter: {
LineItemID: {
ComparisonOperator: "EQ",
AttributeValueList: [marshall(currentItem.LineItemID)]
}
}
}
let lastEvaluatedKey
let items: Array<PSUDataItem> = []
do {
if (lastEvaluatedKey) {
query.ExclusiveStartKey = lastEvaluatedKey
}
const result = await client.send(new QueryCommand(query))
if (result.Items) {
items = items.concat(
result.Items
.map((item) => unmarshall(item) as PSUDataItem)
.filter((item) => item.TaskID !== currentItem.TaskID) // Can't do NE in the query so filter here
)
}
lastEvaluatedKey = result.LastEvaluatedKey
} while (lastEvaluatedKey)
items.sort((a, b) => new Date(a.LastModified).valueOf() - new Date(b.LastModified).valueOf())
return items.pop()
}