Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -901,19 +901,13 @@ describe('GraphQL subscriptions', () => {
expect(subscriptionErrorSpy).toHaveBeenCalledWith<
[QminderGraphQLError[]]
>([
{
message: 'Subscription failed after 5 retries',
errorType: 'ERROR',
},
] satisfies QminderGraphQLError[]);
{ message: 'The maximum subscription limit of 100 has been reached' },
]);

expect(subscription2ErrorSpy).toHaveBeenCalledWith<
[QminderGraphQLError[]]
>([
{
message: 'Subscription failed after 5 retries',
errorType: 'ERROR',
},
{ message: 'The maximum subscription limit of 100 has been reached' },
]);

subscription.unsubscribe();
Expand Down
46 changes: 24 additions & 22 deletions packages/javascript-api/src/lib/services/graphql/graphql.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ export class GraphqlService {
| {
readonly type: 'add';
readonly messageId: string;
readonly errors: QminderGraphQLError[];
}
| {
readonly type: 'remove';
Expand All @@ -142,27 +143,27 @@ export class GraphqlService {
}
>();

private readonly retryableErroredSubscriptionsMessageIds$ =
private readonly retryableErroredSubscriptions$ =
this.retryableErroredSubscriptionsAction$.pipe(
scan((messageIds, action) => {
const result = new Set(messageIds);
scan((subscriptions, action) => {
const result = new Map(subscriptions);

switch (action.type) {
case 'add':
return result.add(action.messageId);
return result.set(action.messageId, action.errors);
case 'remove':
result.delete(action.messageId);
return result;
case 'clear':
return new Set();
return new Map<string, QminderGraphQLError[]>();
}
}, new Set<string>()),
startWith(new Set<string>()),
}, new Map<string, QminderGraphQLError[]>()),
startWith(new Map<string, QminderGraphQLError[]>()),
shareReplay(1),
);

private readonly haveAnyRetryableSubscriptionsErrored$ =
this.retryableErroredSubscriptionsMessageIds$.pipe(
this.retryableErroredSubscriptions$.pipe(
map(({ size }) => !!size),
distinctUntilChanged(),
);
Expand Down Expand Up @@ -193,7 +194,7 @@ export class GraphqlService {
shareReplay(1),
);

this.retryableErroredSubscriptionsMessageIds$.subscribe();
this.retryableErroredSubscriptions$.subscribe();
}

/**
Expand Down Expand Up @@ -586,6 +587,7 @@ export class GraphqlService {
this.retryableErroredSubscriptionsAction$.next({
type: 'add',
messageId: message.id,
errors,
});

if (
Expand Down Expand Up @@ -734,7 +736,9 @@ export class GraphqlService {
const delay = calculateRandomizedExponentialBackoffTime(retryCount);

this.logger.info(
`Retry (${retryCount}) errored subscriptions in ${delay.toFixed(0)}ms`,
`Retry (retry count: ${retryCount}) errored subscriptions in ${delay.toFixed(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before one may have read that e.g. in case of Retry (2) errored subscriptions ..., we're trying 2 errored subscriptions, and for the next retry, 3, etc.

0,
)}ms`,
);

this.retryableErroredSubscriptionsRetryTimeout = setTimeout(() => {
Expand All @@ -746,29 +750,27 @@ export class GraphqlService {

private failErroredSubscriptions(): void {
this.logger.error(
`Errored subscriptions retry limit (${RETRYABLE_ERRORED_SUBSCRIPTIONS_RETRY_LIMIT}) reached, giving up`,
`Errored subscriptions retry limit (${RETRYABLE_ERRORED_SUBSCRIPTIONS_RETRY_LIMIT}) reached. Giving up after ${this.retryableErroredSubscriptionsRetryCount} retries`,
);

this.retryableErroredSubscriptionsMessageIds$
this.retryableErroredSubscriptions$
.pipe(take(1))
.subscribe((messageIds) => {
for (const messageId of messageIds) {
.subscribe((subscriptions) => {
for (const [messageId, errors] of subscriptions) {
const subscriber = this.messagesSubscribers.get(messageId);
this.cleanUpSubscription(messageId);

subscriber?.error([
{
message: `Subscription failed after ${this.retryableErroredSubscriptionsRetryCount} retries`,
errorType: 'ERROR',
},
] satisfies QminderGraphQLError[]);
subscriber?.error(errors);
}
});
}

private retryErroredSubscriptions(): void {
this.retryableErroredSubscriptionsMessageIds$
.pipe(take(1))
this.retryableErroredSubscriptions$
.pipe(
take(1),
map((subscriptions) => subscriptions.keys()),
)
.subscribe((messageIds) => {
for (const messageId of messageIds) {
const subscription = this.subscriptions.find(
Expand Down
Loading