Skip to content

Commit 76c6d5f

Browse files
committed
feat: implement select statement compliance
Signed-off-by: Christian Stewart <christian@aperture.us>
1 parent 1dcb86e commit 76c6d5f

8 files changed

Lines changed: 1113 additions & 158 deletions

File tree

builtin/builtin.ts

Lines changed: 286 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,24 @@ export const append = <T>(slice: Array<T>, ...elements: T[]): Array<T> => {
104104
slice.push(...elements);
105105
return slice;
106106
};
107+
108+
/**
109+
* Represents the result of a channel receive operation with 'ok' value
110+
*/
111+
export interface ChannelReceiveResult<T> {
112+
value: T; // Should be T | ZeroValue<T>
113+
ok: boolean;
114+
}
115+
116+
/**
117+
* Represents a result from a select operation
118+
*/
119+
export interface SelectResult<T> {
120+
value: T; // Should be T | ZeroValue<T>
121+
ok: boolean;
122+
id: number;
123+
}
124+
107125
/**
108126
* Represents a Go channel in TypeScript.
109127
* Supports asynchronous sending and receiving of values.
@@ -119,14 +137,53 @@ export interface Channel<T> {
119137
/**
120138
* Receives a value from the channel.
121139
* Returns a promise that resolves with the received value.
140+
* If the channel is closed, it throws an error.
122141
*/
123142
receive(): Promise<T>;
124143

144+
/**
145+
* Receives a value from the channel along with a boolean indicating
146+
* whether the channel is still open.
147+
* Returns a promise that resolves with {value, ok}.
148+
* - If channel is open and has data: {value: <data>, ok: true}
149+
* - If channel is closed and empty: {value: <zero value>, ok: false}
150+
* - If channel is closed but has remaining buffered data: {value: <data>, ok: true}
151+
*/
152+
receiveWithOk(): Promise<ChannelReceiveResult<T>>;
153+
125154
/**
126155
* Closes the channel.
127156
* No more values can be sent to a closed channel.
157+
* Receive operations on a closed channel return the zero value and ok=false.
128158
*/
129159
close(): void;
160+
161+
/**
162+
* Used in select statements to create a receive operation promise.
163+
* @param id An identifier for this case in the select statement
164+
* @returns Promise that resolves when this case is selected
165+
*/
166+
selectReceive(id: number): Promise<SelectResult<T>>;
167+
168+
/**
169+
* Used in select statements to create a send operation promise.
170+
* @param value The value to send
171+
* @param id An identifier for this case in the select statement
172+
* @returns Promise that resolves when this case is selected
173+
*/
174+
selectSend(value: T, id: number): Promise<SelectResult<boolean>>;
175+
176+
/**
177+
* Checks if the channel has data ready to be received without blocking.
178+
* Used for non-blocking select operations.
179+
*/
180+
canReceiveNonBlocking(): boolean;
181+
182+
/**
183+
* Checks if the channel can accept a send operation without blocking.
184+
* Used for non-blocking select operations.
185+
*/
186+
canSendNonBlocking(): boolean;
130187
}
131188

132189
// A simple implementation of buffered channels
@@ -136,9 +193,12 @@ class BufferedChannel<T> implements Channel<T> {
136193
private capacity: number;
137194
private senders: Array<(value: boolean) => void> = []; // Resolvers for blocked senders
138195
private receivers: Array<(value: T) => void> = []; // Resolvers for blocked receivers
196+
private receiversWithOk: Array<(result: ChannelReceiveResult<T>) => void> = []; // For receive with ok
197+
private zeroValue: T; // Store the zero value for the element type
139198

140-
constructor(capacity: number) {
199+
constructor(capacity: number, zeroValue: T) {
141200
this.capacity = capacity;
201+
this.zeroValue = zeroValue;
142202
}
143203

144204
async send(value: T): Promise<void> {
@@ -153,6 +213,13 @@ class BufferedChannel<T> implements Channel<T> {
153213
return;
154214
}
155215

216+
// If there are waiting receivers with ok, directly pass the value and ok=true
217+
if (this.receiversWithOk.length > 0) {
218+
const receiver = this.receiversWithOk.shift()!;
219+
receiver({ value, ok: true });
220+
return;
221+
}
222+
156223
// If buffer is not full, add to buffer
157224
if (this.buffer.length < this.capacity) {
158225
this.buffer.push(value);
@@ -186,7 +253,7 @@ class BufferedChannel<T> implements Channel<T> {
186253
return value;
187254
}
188255

189-
// If channel is closed and buffer is empty, throw error
256+
// If channel is closed and buffer is empty, throw error (receive without ok)
190257
if (this.closed) {
191258
throw new Error("receive on closed channel");
192259
}
@@ -197,6 +264,96 @@ class BufferedChannel<T> implements Channel<T> {
197264
});
198265
}
199266

267+
async receiveWithOk(): Promise<ChannelReceiveResult<T>> {
268+
// If buffer has values, return from buffer with ok=true
269+
if (this.buffer.length > 0) {
270+
const value = this.buffer.shift()!;
271+
272+
// If there are waiting senders, unblock one
273+
if (this.senders.length > 0) {
274+
const sender = this.senders.shift()!;
275+
sender(true); // Unblock with success
276+
}
277+
278+
return { value, ok: true };
279+
}
280+
281+
// If channel is closed and buffer is empty, return zero value with ok=false
282+
if (this.closed) {
283+
return { value: this.zeroValue, ok: false }; // Return zeroValue
284+
}
285+
286+
// Buffer is empty, block the receiver with ok
287+
return new Promise<ChannelReceiveResult<T>>((resolve) => {
288+
this.receiversWithOk.push(resolve);
289+
});
290+
}
291+
292+
async selectReceive(id: number): Promise<SelectResult<T>> {
293+
// If buffer has values, return from buffer with ok=true
294+
if (this.buffer.length > 0) {
295+
const value = this.buffer.shift()!;
296+
297+
// If there are waiting senders, unblock one
298+
if (this.senders.length > 0) {
299+
const sender = this.senders.shift()!;
300+
sender(true); // Unblock with success
301+
}
302+
303+
return { value, ok: true, id };
304+
}
305+
306+
// If channel is closed and buffer is empty, return zero value with ok=false
307+
if (this.closed) {
308+
return { value: this.zeroValue, ok: false, id }; // Return zeroValue
309+
}
310+
311+
// Buffer is empty, return a promise that will be resolved when a value is available
312+
return new Promise<SelectResult<T>>((resolve) => {
313+
this.receiversWithOk.push((result) => {
314+
resolve({ ...result, id });
315+
});
316+
});
317+
}
318+
319+
async selectSend(value: T, id: number): Promise<SelectResult<boolean>> {
320+
if (this.closed) {
321+
return { value: false, ok: false, id };
322+
}
323+
324+
// If there are waiting receivers, directly pass the value
325+
if (this.receivers.length > 0) {
326+
const receiver = this.receivers.shift()!;
327+
receiver(value);
328+
return { value: true, ok: true, id };
329+
}
330+
331+
// If there are waiting receivers with ok, directly pass the value and ok=true
332+
if (this.receiversWithOk.length > 0) {
333+
const receiver = this.receiversWithOk.shift()!;
334+
receiver({ value, ok: true });
335+
return { value: true, ok: true, id };
336+
}
337+
338+
// If buffer is not full, add to buffer
339+
if (this.buffer.length < this.capacity) {
340+
this.buffer.push(value);
341+
return { value: true, ok: true, id };
342+
}
343+
344+
// Buffer is full, return a promise that will be resolved when buffer space is available
345+
return new Promise<SelectResult<boolean>>((resolve, reject) => {
346+
this.senders.push((success: boolean) => {
347+
if (success) {
348+
this.buffer.push(value);
349+
resolve({ value: true, ok: true, id });
350+
} else {
351+
resolve({ value: false, ok: false, id });
352+
}
353+
});
354+
});
355+
}
356+
200357
close(): void {
201358
if (this.closed) {
202359
throw new Error("close of closed channel");
@@ -210,19 +367,141 @@ class BufferedChannel<T> implements Channel<T> {
210367
}
211368
this.senders = [];
212369

213-
// Unblock all waiting receivers with undefined
370+
// Unblock all waiting receivers with the zero value
214371
for (const receiver of this.receivers) {
215-
receiver(undefined as any);
372+
// Note: receive() without ok throws on closed channel, this unblocking should not happen in correct Go logic
373+
// but for safety, we'll resolve with zero value if it somehow does.
374+
receiver(this.zeroValue);
216375
}
217376
this.receivers = [];
377+
378+
// Unblock all waiting receivers with ok=false and zero value
379+
for (const receiver of this.receiversWithOk) {
380+
receiver({ value: this.zeroValue, ok: false });
381+
}
382+
this.receiversWithOk = [];
383+
}
384+
385+
/**
386+
* Checks if the channel has data ready to be received without blocking.
387+
* Used for non-blocking select operations.
388+
*/
389+
canReceiveNonBlocking(): boolean {
390+
return this.buffer.length > 0 || this.closed;
391+
}
392+
393+
/**
394+
* Checks if the channel can accept a send operation without blocking.
395+
* Used for non-blocking select operations.
396+
*/
397+
canSendNonBlocking(): boolean {
398+
return !this.closed && this.buffer.length < this.capacity;
399+
}
400+
}
401+
402+
/**
403+
* Represents a case in a select statement.
404+
*/
405+
export interface SelectCase<T> {
406+
id: number;
407+
isSend: boolean; // true for send, false for receive
408+
channel: Channel<any>;
409+
value?: any; // Value to send for send cases
410+
// Optional handlers for when this case is selected
411+
onSelected?: (result: SelectResult<T>) => void | Promise<void>;
412+
}
413+
414+
415+
/**
416+
* Helper for 'select' statements. Takes an array of select cases
417+
* and resolves when one of them completes, following Go's select rules.
418+
*
419+
* @param cases Array of SelectCase objects
420+
* @param hasDefault Whether there is a default case
421+
* @returns A promise that resolves with the result of the selected case
422+
*/
423+
export async function selectStatement<T>(
424+
cases: SelectCase<T>[],
425+
hasDefault: boolean = false
426+
): Promise<void> { // Changed return type to void as onSelected handles the case body
427+
if (cases.length === 0 && !hasDefault) {
428+
// Go spec: If there are no cases, the select statement blocks forever.
429+
// Emulate blocking forever with a promise that never resolves.
430+
return new Promise<void>(() => {}); // Promise never resolves
431+
}
432+
433+
// 1. Check for ready (non-blocking) operations
434+
const readyCases: SelectCase<T>[] = [];
435+
for (const caseObj of cases) {
436+
if (caseObj.id === -1) { // Skip default case in this check
437+
continue
438+
}
439+
if (caseObj.isSend) {
440+
if (caseObj.channel.canSendNonBlocking()) {
441+
readyCases.push(caseObj);
442+
}
443+
} else {
444+
if (caseObj.channel.canReceiveNonBlocking()) {
445+
readyCases.push(caseObj);
446+
}
447+
}
448+
}
449+
450+
if (readyCases.length > 0) {
451+
// If one or more cases are ready, choose one pseudo-randomly
452+
const selectedCase = readyCases[Math.floor(Math.random() * readyCases.length)];
453+
454+
// Execute the selected operation and its onSelected handler
455+
if (selectedCase.isSend) {
456+
const result = await selectedCase.channel.selectSend(selectedCase.value, selectedCase.id);
457+
if (selectedCase.onSelected) {
458+
await selectedCase.onSelected(result as SelectResult<T>); // Await the handler
459+
}
460+
} else {
461+
const result = await selectedCase.channel.selectReceive(selectedCase.id);
462+
if (selectedCase.onSelected) {
463+
await selectedCase.onSelected(result); // Await the handler
464+
}
465+
}
466+
return; // Return after executing a ready case
467+
}
468+
469+
// 2. If no operations are ready and there's a default case, select default
470+
if (hasDefault) {
471+
// Find the default case (it will have id -1)
472+
const defaultCase = cases.find(c => c.id === -1);
473+
if (defaultCase && defaultCase.onSelected) {
474+
// Execute the onSelected handler for the default case
475+
await defaultCase.onSelected({ value: undefined, ok: false, id: -1 } as SelectResult<T>); // Await the handler
476+
}
477+
return; // Return after executing the default case
478+
}
479+
480+
// 3. If no operations are ready and no default case, block until one is ready
481+
// Use Promise.race on the blocking promises
482+
const blockingPromises = cases.filter(c => c.id !== -1).map(caseObj => { // Exclude default case
483+
if (caseObj.isSend) {
484+
return caseObj.channel.selectSend(caseObj.value, caseObj.id);
485+
} else {
486+
return caseObj.channel.selectReceive(caseObj.id);
487+
}
488+
});
489+
490+
const result = await Promise.race(blockingPromises);
491+
// Execute onSelected handler for the selected case
492+
const selectedCase = cases.find(c => c.id === result.id);
493+
if (selectedCase && selectedCase.onSelected) {
494+
await selectedCase.onSelected(result); // Await the handler
218495
}
496+
// No explicit return needed here, as the function will implicitly return after the await
219497
}
220498

221499
/**
222-
* Creates a new channel with the specified buffer size.
500+
* Creates a new channel with the specified buffer size and zero value.
223501
* @param bufferSize The size of the channel buffer. If 0, creates an unbuffered channel.
502+
* @param zeroValue The zero value for the channel's element type.
224503
* @returns A new channel instance.
225504
*/
226-
export const makeChannel = <T>(bufferSize: number): Channel<T> => {
227-
return new BufferedChannel<T>(bufferSize);
505+
export const makeChannel = <T>(bufferSize: number, zeroValue: T): Channel<T> => {
506+
return new BufferedChannel<T>(bufferSize, zeroValue);
228507
};

0 commit comments

Comments
 (0)