Skip to content

Commit eec2536

Browse files
committed
feat(queues): add ability to override concurrency limit via API and dashboard
1 parent f8977a7 commit eec2536

16 files changed

Lines changed: 935 additions & 28 deletions

File tree

apps/webapp/app/presenters/v3/QueueListPresenter.server.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ export class QueueListPresenter extends BasePresenter {
114114
name: true,
115115
orderableName: true,
116116
concurrencyLimit: true,
117+
concurrencyLimitBase: true,
118+
concurrencyLimitOverriddenAt: true,
119+
concurrencyLimitOverriddenBy: true,
117120
type: true,
118121
paused: true,
119122
},
@@ -135,6 +138,17 @@ export class QueueListPresenter extends BasePresenter {
135138
),
136139
]);
137140

141+
// Manually "join" the overridden users because there is no way to implement the relationship
142+
// in prisma without adding a foreign key constraint
143+
const overriddenByIds = queues.map((q) => q.concurrencyLimitOverriddenBy).filter(Boolean);
144+
const overriddenByUsers = await this._replica.user.findMany({
145+
where: {
146+
id: { in: overriddenByIds },
147+
},
148+
});
149+
150+
const overriddenByMap = new Map(overriddenByUsers.map((u) => [u.id, u]));
151+
138152
// Transform queues to include running and queued counts
139153
return queues.map((queue) =>
140154
toQueueItem({
@@ -144,6 +158,11 @@ export class QueueListPresenter extends BasePresenter {
144158
running: results[1][queue.name] ?? 0,
145159
queued: results[0][queue.name] ?? 0,
146160
concurrencyLimit: queue.concurrencyLimit ?? null,
161+
concurrencyLimitBase: queue.concurrencyLimitBase ?? null,
162+
concurrencyLimitOverriddenAt: queue.concurrencyLimitOverriddenAt ?? null,
163+
concurrencyLimitOverriddenBy: queue.concurrencyLimitOverriddenBy
164+
? overriddenByMap.get(queue.concurrencyLimitOverriddenBy) ?? null
165+
: null,
147166
paused: queue.paused,
148167
})
149168
);

apps/webapp/app/presenters/v3/QueueRetrievePresenter.server.ts

Lines changed: 66 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
import { type AuthenticatedEnvironment } from "~/services/apiAuth.server";
22
import { engine } from "~/v3/runEngine.server";
33
import { BasePresenter } from "./basePresenter.server";
4-
import { type TaskQueueType } from "@trigger.dev/database";
4+
import { TaskQueue, User, type TaskQueueType } from "@trigger.dev/database";
55
import { assertExhaustive } from "@trigger.dev/core";
66
import { determineEngineVersion } from "~/v3/engineVersion.server";
7-
import { type QueueItem, type RetrieveQueueParam } from "@trigger.dev/core/v3";
7+
import { type Prettify, type QueueItem, type RetrieveQueueParam } from "@trigger.dev/core/v3";
88
import { PrismaClientOrTransaction } from "@trigger.dev/database";
99

10+
export type FoundQueue = Prettify<
11+
Omit<TaskQueue, "concurrencyLimitOverriddenBy"> & {
12+
concurrencyLimitOverriddenBy?: User | null;
13+
}
14+
>;
15+
1016
/**
1117
* Shared queue lookup logic used by both QueueRetrievePresenter and PauseQueueService
1218
*/
@@ -16,22 +22,50 @@ export async function getQueue(
1622
queue: RetrieveQueueParam
1723
) {
1824
if (typeof queue === "string") {
19-
return prismaClient.taskQueue.findFirst({
25+
return joinQueueWithUser(
26+
prismaClient,
27+
await prismaClient.taskQueue.findFirst({
28+
where: {
29+
friendlyId: queue,
30+
runtimeEnvironmentId: environment.id,
31+
},
32+
})
33+
);
34+
}
35+
36+
const queueName =
37+
queue.type === "task" ? `task/${queue.name.replace(/^task\//, "")}` : queue.name;
38+
return joinQueueWithUser(
39+
prismaClient,
40+
await prismaClient.taskQueue.findFirst({
2041
where: {
21-
friendlyId: queue,
42+
name: queueName,
2243
runtimeEnvironmentId: environment.id,
2344
},
24-
});
45+
})
46+
);
47+
}
48+
49+
async function joinQueueWithUser(
50+
prismaClient: PrismaClientOrTransaction,
51+
queue?: TaskQueue | null
52+
): Promise<FoundQueue | undefined> {
53+
if (!queue) return undefined;
54+
if (!queue.concurrencyLimitOverriddenBy) {
55+
return {
56+
...queue,
57+
concurrencyLimitOverriddenBy: undefined,
58+
};
2559
}
2660

27-
const queueName =
28-
queue.type === "task" ? `task/${queue.name.replace(/^task\//, "")}` : queue.name;
29-
return prismaClient.taskQueue.findFirst({
30-
where: {
31-
name: queueName,
32-
runtimeEnvironmentId: environment.id,
33-
},
61+
const user = await prismaClient.user.findFirst({
62+
where: { id: queue.concurrencyLimitOverriddenBy },
3463
});
64+
65+
return {
66+
...queue,
67+
concurrencyLimitOverriddenBy: user,
68+
};
3569
}
3670

3771
export class QueueRetrievePresenter extends BasePresenter {
@@ -75,6 +109,9 @@ export class QueueRetrievePresenter extends BasePresenter {
75109
running: results[1]?.[queue.name] ?? 0,
76110
queued: results[0]?.[queue.name] ?? 0,
77111
concurrencyLimit: queue.concurrencyLimit ?? null,
112+
concurrencyLimitBase: queue.concurrencyLimitBase ?? null,
113+
concurrencyLimitOverriddenAt: queue.concurrencyLimitOverriddenAt ?? null,
114+
concurrencyLimitOverriddenBy: queue.concurrencyLimitOverriddenBy ?? null,
78115
paused: queue.paused,
79116
}),
80117
};
@@ -104,6 +141,9 @@ export function toQueueItem(data: {
104141
running: number;
105142
queued: number;
106143
concurrencyLimit: number | null;
144+
concurrencyLimitBase: number | null;
145+
concurrencyLimitOverriddenAt: Date | null;
146+
concurrencyLimitOverriddenBy: User | null;
107147
paused: boolean;
108148
}): QueueItem & { releaseConcurrencyOnWaitpoint: boolean } {
109149
return {
@@ -113,9 +153,22 @@ export function toQueueItem(data: {
113153
type: queueTypeFromType(data.type),
114154
running: data.running,
115155
queued: data.queued,
116-
concurrencyLimit: data.concurrencyLimit,
117156
paused: data.paused,
157+
concurrencyLimit: data.concurrencyLimit,
158+
concurrency: {
159+
current: data.concurrencyLimit,
160+
base: data.concurrencyLimitBase,
161+
override: data.concurrencyLimitOverriddenAt ? data.concurrencyLimit : null,
162+
overriddenBy: toQueueConcurrencyOverriddenBy(data.concurrencyLimitOverriddenBy),
163+
overriddenAt: data.concurrencyLimitOverriddenAt,
164+
},
118165
// TODO: This needs to be removed but keeping this here for now to avoid breaking existing clients
119166
releaseConcurrencyOnWaitpoint: true,
120167
};
121168
}
169+
170+
function toQueueConcurrencyOverriddenBy(user: User | null) {
171+
if (!user) return null;
172+
173+
return user.displayName ?? user.name ?? null;
174+
}

0 commit comments

Comments
 (0)