Skip to content
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ The package supports merging configurations from multiple sources (local, remote

1. The remote configuration is fetched from the server specified by the `configServerUrl` option.
2. If the `version` is set to `'latest'`, the latest version of the configuration is fetched. Otherwise, the specified version is fetched.
3. **Continuous Polling:** The SDK continuously polls the server using HTTP ETags (`If-None-Match`). When a `200 OK` is received (indicating a change), the `onChange` callback is triggered (if provided), and the process is then terminated to allow for a fresh start with the new configuration. `304 Not Modified` responses are silently ignored.
3. **Continuous Polling:** The SDK continuously polls the server using HTTP ETags (`If-None-Match`). When a `200 OK` is received (indicating a change), the `onChange` callback is triggered (if provided), and the process is then terminated to allow for a fresh start with the new configuration. `304 Not Modified` responses are silently ignored. To prevent cluster-wide traffic spikes (thundering herd), a **randomized jitter of +/- 15%** is automatically applied to each polling cycle.

### Environment Variables

Expand Down
2 changes: 2 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ export const SCHEMA_DOMAIN = 'https://mapcolonies.com/';

export const SCHEMAS_PACKAGE_RESOLVED_PATH = require.resolve('@map-colonies/schemas');
export const SCHEMA_BASE_PATH = SCHEMAS_PACKAGE_RESOLVED_PATH.substring(0, SCHEMAS_PACKAGE_RESOLVED_PATH.lastIndexOf(path.sep));

export const JITTER_PERCENTAGE = 0.15;
41 changes: 32 additions & 9 deletions src/rollout/ChangeDetector.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { JITTER_PERCENTAGE } from '../constants';
import { getRemoteConfig } from '../httpClient';
import { BaseOptions } from '../types';
import { createDebug } from '../utils/debug';
Expand All @@ -23,23 +24,45 @@ export class ChangeDetector {
}

public start(): void {
const interval = this.options.pollIntervalMs;
debug('Starting change detector with interval %d ms', interval);

this.timer = setInterval(() => {
this.poll().catch((err) => {
debug('Error during polling: %s', (err as Error).message);
});
}, interval);
debug('Starting change detector');
this.scheduleNextPoll();
}

public stop(): void {
if (this.timer) {
clearInterval(this.timer);
clearTimeout(this.timer);
this.timer = undefined;
}
}

private scheduleNextPoll(): void {
if (this.timer) {
clearTimeout(this.timer);
}
const baseInterval = this.options.pollIntervalMs!;
const jitter = baseInterval * JITTER_PERCENTAGE;
// eslint-disable-next-line @typescript-eslint/no-magic-numbers
const randomJitter = (Math.random() * 2 - 1) * jitter;
const nextInterval = baseInterval + randomJitter;

debug('Scheduling next poll in %d ms', nextInterval);
this.timer = setTimeout(() => {
this.poll()
.catch((err) => {
if (isConfigError(err, 'httpResponseError') || isConfigError(err, 'httpGeneralError')) {
debug('Error during polling: %s', err.message);
} else {
debug('Unknown error during polling: %O', err);
}
})
.finally(() => {
if (this.timer !== undefined) {
this.scheduleNextPoll();
}
});
}, nextInterval);
}

private async poll(): Promise<void> {
debug('Polling config %s@%s with etag %s', this.options.configName, this.options.version, this.currentEtag);

Expand Down
65 changes: 59 additions & 6 deletions tests/rollout.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { Interceptable, MockAgent, setGlobalDispatcher } from 'undici';
import { commonDbPartialV1 } from '@map-colonies/schemas';
import { StatusCodes } from 'http-status-codes';
import { random } from 'lodash';
import { config } from '../src/config';
import { JITTER_PERCENTAGE } from '../src/constants';
import { createMockConfigData } from './mocks';

const URL = 'http://localhost:8080';
Expand Down Expand Up @@ -61,7 +63,7 @@ describe('Continuous Polling (ChangeDetector)', () => {
.reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } });

// Act (Wait for Poll)
await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL);
await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2);

// Assert (Updated State)
expect(onChangeMock).toHaveBeenCalled();
Expand Down Expand Up @@ -102,13 +104,13 @@ describe('Continuous Polling (ChangeDetector)', () => {
.reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } });

// Act (Wait for First Poll)
await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL);
await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2);

// Assert (First change triggered)
expect(onChangeMock).toHaveBeenCalledTimes(1);

// Act (Advance time to see if polling continues)
await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL);
await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2);

// Assert (Polling stopped, no further calls)
expect(onChangeMock).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -150,7 +152,7 @@ describe('Continuous Polling (ChangeDetector)', () => {
.reply(StatusCodes.OK, newConfigData1, { headers: { etag: 'etag-2' } });

// Act (Wait for First Poll)
await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL);
await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2);

// Assert (First change triggered)
expect(onChangeMock).toHaveBeenCalledTimes(1);
Expand All @@ -165,7 +167,7 @@ describe('Continuous Polling (ChangeDetector)', () => {
.reply(StatusCodes.OK, newConfigData2, { headers: { etag: 'etag-3' } });

// Act (Wait for Second Poll)
await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL);
await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2);

// Assert (Second change triggered)
expect(onChangeMock).toHaveBeenCalledTimes(2);
Expand Down Expand Up @@ -201,7 +203,7 @@ describe('Continuous Polling (ChangeDetector)', () => {
.reply(StatusCodes.NOT_MODIFIED);

// Act (Wait for Poll)
await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL);
await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2);

// Assert
expect(onChangeMock).not.toHaveBeenCalled();
Expand Down Expand Up @@ -236,4 +238,55 @@ describe('Continuous Polling (ChangeDetector)', () => {
// Assert
expect(onChangeMock).not.toHaveBeenCalled();
});

it('should apply randomized jitter within boundaries over 10 cycles', async () => {
// Arrange
const initialConfigData = createMockConfigData();

client
.intercept({ path: '/capabilities', method: 'GET' })
.reply(StatusCodes.OK, { serverVersion: '2.0.0', schemasPackageVersion: '99.9.9', pubSubEnabled: false });
client
.intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' })
.reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } });

const cycles = random(1, 10); // Random number of cycles to test jitter
client
.intercept({
path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`,
method: 'GET',
})
.reply(StatusCodes.NOT_MODIFIED)
.times(cycles);

const setTimeoutSpy = vi.spyOn(global, 'setTimeout');

// Act
await config({
configName: 'name',
version: 1,
schema: commonDbPartialV1,
localConfigPath: './tests/config',
onChange: onChangeMock,
});

const maxJitter = DEFAULT_POLL_INTERVAL * JITTER_PERCENTAGE;
const minWait = DEFAULT_POLL_INTERVAL - maxJitter;
const maxWait = DEFAULT_POLL_INTERVAL + maxJitter;

// Run the randomized number of cycles
for (let i = 0; i < cycles; i++) {
// Advance timers by maxWait to definitely trigger the next poll
await vi.advanceTimersByTimeAsync(maxWait + 1);
}

// Assert
const pollTimeouts = setTimeoutSpy.mock.calls.map((call) => call[1] as number).filter((time) => time >= minWait && time <= maxWait);

expect(pollTimeouts.length).toBeGreaterThanOrEqual(cycles);
pollTimeouts.forEach((time) => {
expect(time).toBeGreaterThanOrEqual(minWait);
expect(time).toBeLessThanOrEqual(maxWait);
});
});
});
Loading