|
| 1 | +import type { PrismaClient } from "@prisma/client"; |
1 | 2 | import type { PrismaQueue } from "src/index"; |
2 | 3 | import { PrismaJob } from "src/PrismaJob"; |
3 | 4 | import { debug, serializeError, waitFor } from "src/utils"; |
4 | 5 | import { |
5 | 6 | createEmailQueue, |
| 7 | + createEmailQueueNonTransactional, |
6 | 8 | DEFAULT_POLL_INTERVAL, |
7 | 9 | prisma, |
8 | 10 | waitForNextEvent, |
@@ -857,3 +859,213 @@ describe("PrismaQueue", () => { |
857 | 859 | }); |
858 | 860 | }); |
859 | 861 | }); |
| 862 | + |
| 863 | +describe("PrismaQueue (transactional: false)", () => { |
| 864 | + describe("dequeue", () => { |
| 865 | + let queue: PrismaQueue<EmailJobPayload, EmailJobResult>; |
| 866 | + beforeAll(() => { |
| 867 | + queue = createEmailQueueNonTransactional(); |
| 868 | + }); |
| 869 | + beforeEach(async () => { |
| 870 | + await prisma.queueJob.deleteMany(); |
| 871 | + void queue.start(); |
| 872 | + }); |
| 873 | + afterEach(() => { |
| 874 | + void queue.stop(); |
| 875 | + }); |
| 876 | + it("should properly dequeue a successful job", async () => { |
| 877 | + queue.worker = vi.fn(async (_job, _client) => { |
| 878 | + await waitFor(200); |
| 879 | + return { code: "200" }; |
| 880 | + }); |
| 881 | + const job = await queue.enqueue({ email: "foo@bar.com" }); |
| 882 | + await waitForNextJob(queue); |
| 883 | + expect(queue.worker).toHaveBeenCalledTimes(1); |
| 884 | + const record = await job.fetch(); |
| 885 | + expect(record.finishedAt).toBeInstanceOf(Date); |
| 886 | + }); |
| 887 | + it("should properly dequeue a failed job", async () => { |
| 888 | + let error: Error | null = null; |
| 889 | + // eslint-disable-next-line @typescript-eslint/require-await |
| 890 | + queue.worker = vi.fn(async (_job) => { |
| 891 | + error = new Error("failed"); |
| 892 | + throw error; |
| 893 | + }); |
| 894 | + const job = await queue.enqueue({ email: "foo@bar.com" }); |
| 895 | + await waitForNextJob(queue); |
| 896 | + expect(queue.worker).toHaveBeenCalledTimes(1); |
| 897 | + const record = await job.fetch(); |
| 898 | + expect(record.finishedAt).toBeNull(); |
| 899 | + expect(record.error).toEqual(serializeError(error)); |
| 900 | + }); |
| 901 | + it("should provide PrismaClient with $transaction to worker", async () => { |
| 902 | + let clientHasTransaction = false; |
| 903 | + // eslint-disable-next-line @typescript-eslint/require-await |
| 904 | + queue.worker = vi.fn(async (_job: EmailJob, client: PrismaClient) => { |
| 905 | + clientHasTransaction = typeof client.$transaction === "function"; |
| 906 | + return { code: "200" }; |
| 907 | + }); |
| 908 | + await queue.enqueue({ email: "foo@bar.com" }); |
| 909 | + await waitForNextJob(queue); |
| 910 | + expect(clientHasTransaction).toBe(true); |
| 911 | + }); |
| 912 | + it("should reset processedAt on retry", async () => { |
| 913 | + const retryQueue = createEmailQueueNonTransactional({ |
| 914 | + maxAttempts: 3, |
| 915 | + pollInterval: 200, |
| 916 | + retryStrategy: ({ attempts, maxAttempts }) => { |
| 917 | + if (maxAttempts !== null && attempts >= maxAttempts) return null; |
| 918 | + return 100 * attempts; |
| 919 | + }, |
| 920 | + }); |
| 921 | + await prisma.queueJob.deleteMany(); |
| 922 | + // eslint-disable-next-line @typescript-eslint/require-await |
| 923 | + retryQueue.worker = vi.fn(async () => { |
| 924 | + throw new Error("always fails"); |
| 925 | + }); |
| 926 | + const job = await retryQueue.enqueue({ email: "retry@test.com" }); |
| 927 | + void retryQueue.start(); |
| 928 | + // Wait for first dequeue |
| 929 | + await waitForNextJob(retryQueue); |
| 930 | + // After first failure, processedAt should be reset |
| 931 | + const record = await job.fetch(); |
| 932 | + expect(record.processedAt).toBeNull(); |
| 933 | + expect(record.finishedAt).toBeNull(); |
| 934 | + // Wait for remaining retries |
| 935 | + await waitForNthJob(retryQueue, 2); |
| 936 | + await retryQueue.stop(); |
| 937 | + expect(retryQueue.worker).toHaveBeenCalledTimes(3); |
| 938 | + // After max attempts, job should be finished |
| 939 | + const finalRecord = await job.fetch(); |
| 940 | + expect(finalRecord.finishedAt).toBeInstanceOf(Date); |
| 941 | + }); |
| 942 | + it("should work with deleteOn: success", async () => { |
| 943 | + const deleteQueue = createEmailQueueNonTransactional({ deleteOn: "success" }); |
| 944 | + await prisma.queueJob.deleteMany(); |
| 945 | + // eslint-disable-next-line @typescript-eslint/require-await |
| 946 | + deleteQueue.worker = vi.fn(async () => { |
| 947 | + return { code: "200" }; |
| 948 | + }); |
| 949 | + const job = await deleteQueue.enqueue({ email: "foo@bar.com" }); |
| 950 | + void deleteQueue.start(); |
| 951 | + await waitForNextJob(deleteQueue); |
| 952 | + await deleteQueue.stop(); |
| 953 | + const record = await job.fetch(); |
| 954 | + expect(record).toBeNull(); |
| 955 | + }); |
| 956 | + it("should properly update job progress", async () => { |
| 957 | + queue.worker = vi.fn(async (job: EmailJob) => { |
| 958 | + await job.progress(50); |
| 959 | + throw new Error("failed"); |
| 960 | + }); |
| 961 | + const job = await queue.enqueue({ email: "foo@bar.com" }); |
| 962 | + void queue.start(); |
| 963 | + await waitForNextJob(queue); |
| 964 | + const record = await job.fetch(); |
| 965 | + expect(record.progress).toBe(50); |
| 966 | + }); |
| 967 | + it("should properly re-enqueue a recurring cron job", async () => { |
| 968 | + // eslint-disable-next-line @typescript-eslint/require-await |
| 969 | + queue.worker = vi.fn(async () => { |
| 970 | + return { code: "200" }; |
| 971 | + }); |
| 972 | + await queue.schedule( |
| 973 | + { key: "nt-email-schedule", cron: "5 5 * * *", runAt: new Date() }, |
| 974 | + { email: "foo@bar.com" }, |
| 975 | + ); |
| 976 | + void queue.start(); |
| 977 | + await waitForNextEvent(queue, "enqueue"); |
| 978 | + const jobs = await prisma.queueJob.findMany({ where: { key: "nt-email-schedule" } }); |
| 979 | + expect(jobs.length).toBe(2); |
| 980 | + const nextJob = jobs[1]; |
| 981 | + expect(nextJob?.runAt.getHours()).toBe(5); |
| 982 | + expect(nextJob?.runAt.getMinutes()).toBe(5); |
| 983 | + }); |
| 984 | + afterAll(() => { |
| 985 | + void queue.stop(); |
| 986 | + }); |
| 987 | + }); |
| 988 | + |
| 989 | + describe("requeueStale", () => { |
| 990 | + const STALE_QUEUE_NAME = "stale-test-queue"; |
| 991 | + it("should recover stuck jobs", async () => { |
| 992 | + const queue = createEmailQueueNonTransactional({ pollInterval: 200, name: STALE_QUEUE_NAME }); |
| 993 | + await prisma.queueJob.deleteMany(); |
| 994 | + // Simulate a stuck job: processedAt set, finishedAt null |
| 995 | + const staleDate = new Date(Date.now() - 60_000); // 60 seconds ago |
| 996 | + await prisma.queueJob.create({ |
| 997 | + data: { |
| 998 | + queue: STALE_QUEUE_NAME, |
| 999 | + payload: { email: "stuck@test.com" }, |
| 1000 | + processedAt: staleDate, |
| 1001 | + attempts: 1, |
| 1002 | + runAt: staleDate, |
| 1003 | + }, |
| 1004 | + }); |
| 1005 | + // Verify it's not available for dequeue |
| 1006 | + const sizeBefore = await queue.size(true); |
| 1007 | + expect(sizeBefore).toBe(0); |
| 1008 | + // Requeue stale jobs older than 30s |
| 1009 | + const count = await queue.requeueStale({ olderThanMs: 30_000 }); |
| 1010 | + expect(count).toBe(1); |
| 1011 | + // Now it should be available |
| 1012 | + const sizeAfter = await queue.size(true); |
| 1013 | + expect(sizeAfter).toBe(1); |
| 1014 | + }); |
| 1015 | + it("should not requeue recently claimed jobs", async () => { |
| 1016 | + const queue = createEmailQueueNonTransactional({ pollInterval: 200, name: STALE_QUEUE_NAME }); |
| 1017 | + await prisma.queueJob.deleteMany(); |
| 1018 | + // Simulate a recently claimed job |
| 1019 | + await prisma.queueJob.create({ |
| 1020 | + data: { |
| 1021 | + queue: STALE_QUEUE_NAME, |
| 1022 | + payload: { email: "recent@test.com" }, |
| 1023 | + processedAt: new Date(), // just now |
| 1024 | + attempts: 1, |
| 1025 | + runAt: new Date(), |
| 1026 | + }, |
| 1027 | + }); |
| 1028 | + const count = await queue.requeueStale({ olderThanMs: 30_000 }); |
| 1029 | + expect(count).toBe(0); |
| 1030 | + }); |
| 1031 | + }); |
| 1032 | + |
| 1033 | + describe("rolling upgrade safety", () => { |
| 1034 | + const UPGRADE_QUEUE_NAME = "upgrade-test-queue"; |
| 1035 | + it("should not pick up rows with non-null processedAt", async () => { |
| 1036 | + const queue = createEmailQueueNonTransactional({ pollInterval: 100, name: UPGRADE_QUEUE_NAME }); |
| 1037 | + await prisma.queueJob.deleteMany(); |
| 1038 | + // eslint-disable-next-line @typescript-eslint/require-await |
| 1039 | + queue.worker = vi.fn(async () => { |
| 1040 | + return { code: "200" }; |
| 1041 | + }); |
| 1042 | + // Simulate an old in-flight row (from before the processedAt guard) |
| 1043 | + await prisma.queueJob.create({ |
| 1044 | + data: { |
| 1045 | + queue: UPGRADE_QUEUE_NAME, |
| 1046 | + payload: { email: "old@test.com" }, |
| 1047 | + processedAt: new Date(Date.now() - 120_000), |
| 1048 | + attempts: 1, |
| 1049 | + runAt: new Date(Date.now() - 120_000), |
| 1050 | + }, |
| 1051 | + }); |
| 1052 | + // Also add a normal job |
| 1053 | + await queue.enqueue({ email: "new@test.com" }); |
| 1054 | + void queue.start(); |
| 1055 | + await waitForNextJob(queue); |
| 1056 | + await queue.stop(); |
| 1057 | + // Only the new job should have been processed |
| 1058 | + expect(queue.worker).toHaveBeenCalledTimes(1); |
| 1059 | + expect(queue.worker).toHaveBeenCalledWith( |
| 1060 | + expect.objectContaining({ payload: { email: "new@test.com" } }), |
| 1061 | + expect.any(Object), |
| 1062 | + ); |
| 1063 | + // The old stale row should still be there, untouched |
| 1064 | + const staleJobs = await prisma.queueJob.findMany({ |
| 1065 | + where: { payload: { equals: { email: "old@test.com" } } }, |
| 1066 | + }); |
| 1067 | + expect(staleJobs.length).toBe(1); |
| 1068 | + expect(staleJobs[0]?.finishedAt).toBeNull(); |
| 1069 | + }); |
| 1070 | + }); |
| 1071 | +}); |
0 commit comments