Skip to content

Commit 07b6172

Browse files
committed
refactor: create QueueWorker and QueueDispatcher
1 parent 14ddbbe commit 07b6172

6 files changed

Lines changed: 556 additions & 140 deletions

File tree

src/queue-dispatcher.ts

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
import { Log } from 'ts-tiny-log';
2+
3+
import { TaskPersistence } from './task-persistence';
4+
import { Task } from './task';
5+
import { Worker } from './worker';
6+
import { Pool } from './pool';
7+
import { QueueOptions } from './queue';
8+
9+
export interface DispatcherConfig<TIn, TOut> {
10+
/**
11+
* Queue name. Must be unique
12+
*/
13+
name: string;
14+
15+
/**
16+
* Worker entry file. Must be a relative/absolute path/file
17+
*/
18+
workerEntry: string;
19+
20+
/**
21+
* Number of workers
22+
*/
23+
nWorkers: number;
24+
25+
/**
26+
* Worker class type
27+
*/
28+
workerType: typeof Worker;
29+
30+
/**
31+
* Error handler
32+
*/
33+
error?: (err: Error) => void | Promise<void>;
34+
35+
/**
36+
* Queue options for retry and polling
37+
*/
38+
options: QueueOptions<TIn, TOut>;
39+
}
40+
41+
/**
42+
* QueueDispatcher class - handles task distribution on the main thread
43+
*
44+
* @typeParam TIn Queue task input type
45+
* @typeParam TOut Queue task output type
46+
*/
47+
export class QueueDispatcher<TIn, TOut> {
48+
/* eslint-disable-next-line no-console */
49+
protected log: Log = <Log><unknown>console;
50+
51+
protected tasks: TaskPersistence<TIn, TOut>;
52+
protected pool: Pool<TIn, TOut>;
53+
protected config: DispatcherConfig<TIn, TOut>;
54+
55+
/**
56+
* Runs on: Main
57+
*
58+
* @param tasks Task persistence instance
59+
* @param config Dispatcher configuration
60+
*/
61+
public constructor(
62+
tasks: TaskPersistence<TIn, TOut>,
63+
config: DispatcherConfig<TIn, TOut>
64+
) {
65+
this.tasks = tasks;
66+
this.config = config;
67+
}
68+
69+
/**
70+
* Initialize the dispatcher by setting up the worker pool
71+
*
72+
* Runs on: Main
73+
*/
74+
public async initialize(): Promise<void> {
75+
this.pool = new Pool<TIn, TOut>({
76+
nWorkers: this.config.nWorkers,
77+
workerEntry: this.config.workerEntry,
78+
queueName: this.config.name,
79+
workerType: this.config.workerType,
80+
error: this.config.error,
81+
});
82+
83+
await this.pool.initialize();
84+
}
85+
86+
/**
87+
* Start the polling loop to distribute tasks to workers
88+
*
89+
* Runs on: Main
90+
*/
91+
public start(): void {
92+
if (!this.pool) {
93+
throw new Error('Dispatcher not initialized. Call initialize() first.');
94+
}
95+
96+
let worker: Worker<TIn, TOut> | null;
97+
98+
setInterval(async () => {
99+
if (!worker) {
100+
worker = await this.pool.reserve();
101+
}
102+
103+
if (worker) {
104+
const task = this.tasks.dequeue();
105+
106+
if (task) {
107+
// Check if task has expired
108+
if (this.isTaskExpired(task)) {
109+
if (task.reject) {
110+
task.reject(new Error('Task expired'));
111+
}
112+
113+
// TODO: Emit options.error?
114+
}
115+
else {
116+
// Apply retry logic to the task's reject callback
117+
this.applyRetryHandler(task);
118+
worker.startTask(task);
119+
worker = null;
120+
}
121+
}
122+
}
123+
}, this.config.options.pollingRate);
124+
}
125+
126+
/**
127+
* Apply retry handling to a task's reject callback
128+
*
129+
* Runs on: Main
130+
*
131+
* @param task Task to apply retry handler to
132+
*/
133+
protected applyRetryHandler(task: Task<TIn, TOut>): void {
134+
const originalReject = task.reject;
135+
const attemptCount = task.attempts ?? 0;
136+
137+
task.reject = (error: Error) => {
138+
if (attemptCount < this.config.options.maxAttempts - 1) {
139+
// Calculate scheduled start time with retry delay
140+
if (this.config.options.retryDelayMs > 0) {
141+
const scheduledAt = new Date(
142+
Date.now() + this.config.options.retryDelayMs
143+
);
144+
145+
if (task.schedule) {
146+
task.schedule.scheduledAt = scheduledAt;
147+
}
148+
else {
149+
task.schedule = { scheduledAt };
150+
}
151+
}
152+
153+
// Re-enqueue the task with incremented attempt count and delay
154+
this.tasks.enqueue({
155+
...task,
156+
attempts: attemptCount + 1,
157+
reject: originalReject,
158+
});
159+
}
160+
else {
161+
// Max retries exceeded, call the original reject
162+
if (originalReject) {
163+
originalReject(error);
164+
}
165+
}
166+
};
167+
}
168+
169+
/**
170+
* Check if a task has expired
171+
*
172+
* Runs on: Main
173+
*
174+
* @param task Task to check
175+
* @return Returns true if the task has expired, false otherwise
176+
*/
177+
protected isTaskExpired(task: Task<TIn, TOut>): boolean {
178+
if (task.schedule && task.schedule.expiresAt) {
179+
return new Date() > task.schedule.expiresAt;
180+
}
181+
182+
return false;
183+
}
184+
}

src/queue-worker.ts

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import { Log } from 'ts-tiny-log';
2+
import { parentPort, workerData } from 'worker_threads';
3+
4+
import { ParentMessage, ParentMessageTypes, WorkerSpawnData } from './worker';
5+
import { ParentThread } from './parent';
6+
import type { QueueCallback, ErrorHandler } from './queue';
7+
8+
export interface WorkerConfig<TIn, TOut> {
9+
/**
10+
* Function to run on worker startup
11+
*/
12+
startup: (data: WorkerSpawnData) => Promise<void>;
13+
14+
/**
15+
* Function to run for the queue task
16+
*/
17+
callback: QueueCallback<TIn, TOut>;
18+
19+
/**
20+
* Function to run on error
21+
*/
22+
error?: ErrorHandler;
23+
24+
/**
25+
* Class for communicating Worker -> Parent
26+
*/
27+
parentType: typeof ParentThread;
28+
}
29+
30+
/**
31+
* QueueWorker class - handles task execution on worker threads
32+
*
33+
* @typeParam TIn Queue task input type
34+
* @typeParam TOut Queue task output type
35+
*/
36+
export class QueueWorker<TIn, TOut> {
37+
/* eslint-disable-next-line no-console */
38+
protected log: Log = <Log><unknown>console;
39+
40+
protected parent: ParentThread;
41+
protected config: WorkerConfig<TIn, TOut>;
42+
43+
/**
44+
* Runs on: Worker
45+
*
46+
* @param config Worker configuration
47+
*/
48+
public constructor(config: WorkerConfig<TIn, TOut>) {
49+
this.config = config;
50+
this.parent = new config.parentType();
51+
}
52+
53+
/**
54+
* Initialize the worker with startup callback
55+
*
56+
* Runs on: Worker
57+
*/
58+
public async initialize(): Promise<void> {
59+
await this.config.startup(workerData);
60+
if (parentPort) {
61+
this.listenForWork();
62+
}
63+
this.parent.workerStarted();
64+
}
65+
66+
/**
67+
* Start listening for work from the parent thread
68+
*
69+
* Runs on: Worker
70+
*/
71+
protected listenForWork(): void {
72+
parentPort.on('message', (message: ParentMessage) => {
73+
if (message.type === ParentMessageTypes.START_TASK) {
74+
this.config.callback(message.data)
75+
.then((response?: TOut) => this.parent.taskFinished(response))
76+
.catch((err?: Error) => this.parent.taskFailed(err));
77+
}
78+
});
79+
}
80+
}

0 commit comments

Comments
 (0)