Skip to content

Commit eabaceb

Browse files
authored
Fix PgSubscriber logic (#2874)
2 parents d2ec3d8 + 6f95077 commit eabaceb

2 files changed

Lines changed: 31 additions & 20 deletions

File tree

.changeset/itchy-dingos-repair.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@dataplan/pg": patch
3+
---
4+
5+
Fix bug in PgSubscriber logic causing nesting of iterator results.

grafast/dataplan-pg/src/adaptors/pg.ts

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -524,16 +524,23 @@ export class PgSubscriber<
524524
// eslint-disable-next-line @typescript-eslint/no-this-alias
525525
const that = this;
526526
const { eventEmitter, topics } = this;
527-
const stack: any[] = [];
528-
const queue: Deferred<any>[] = [];
529-
let finished: IteratorReturnResult<any> | false = false;
530-
531-
function doFinally(value?: any) {
532-
if (!finished) {
533-
finished = { done: true, value };
534-
if (queue) {
527+
const stack: Array<TTopics[TTopic]> = [];
528+
const queue: Array<Deferred<IteratorResult<TTopics[TTopic]>>> = [];
529+
let finished: IteratorReturnResult<unknown> | Promise<never> | null = null;
530+
531+
function doFinally(value?: unknown, error?: Error) {
532+
if (finished === null) {
533+
if (error) {
534+
finished = Promise.reject(error);
535+
// Avoid unhandled promise rejection errors
536+
finished.then(null, noop);
537+
} else {
538+
finished = { done: true, value };
539+
}
540+
if (queue.length > 0) {
535541
const promises = queue.splice(0, queue.length);
536-
promises.forEach((p) => p.resolve(finished));
542+
const f = finished; // Appease TypeScript
543+
promises.forEach((p) => p.resolve(f));
537544
}
538545
eventEmitter.removeListener(topic as string, recv);
539546
// Every code path above this has to go through a `yield` and thus
@@ -550,37 +557,36 @@ export class PgSubscriber<
550557
return finished;
551558
}
552559

553-
const asyncIterableIterator: AsyncIterableIterator<any> = {
560+
const asyncIterableIterator: AsyncIterableIterator<TTopics[TTopic]> = {
554561
[Symbol.asyncIterator]() {
555562
return this;
556563
},
557564
async next() {
558565
if (stack.length > 0) {
559-
const value = await stack.shift();
566+
const value = stack.shift() as TTopics[TTopic];
560567
return { done: false, value };
561568
} else if (finished) {
562569
return finished;
563570
} else {
564571
// This must be done synchronously - there must be **NO AWAIT BEFORE THIS**
565-
const waiting = defer();
572+
const waiting = defer<IteratorResult<TTopics[TTopic]>>();
566573
queue.push(waiting);
567574

568-
const value = await waiting;
569-
return { done: false, value };
575+
return waiting;
570576
}
571577
},
572578
async return(value) {
573579
return doFinally(value);
574580
},
575-
async throw() {
576-
return doFinally();
581+
async throw(e) {
582+
return doFinally(undefined, e);
577583
},
578584
};
579585

580-
function recv(payload: any) {
581-
if (queue.length > 0) {
582-
const first = queue.shift();
583-
first!.resolve(payload);
586+
function recv(payload: TTopics[TTopic]) {
587+
const first = queue.shift();
588+
if (first) {
589+
first.resolve({ done: false, value: payload });
584590
} else {
585591
stack.push(payload);
586592
}

0 commit comments

Comments
 (0)