diff --git a/README.md b/README.md index 2657a41..73edeb5 100644 --- a/README.md +++ b/README.md @@ -163,7 +163,7 @@ The package supports merging configurations from multiple sources (local, remote 1. The remote configuration is fetched from the server specified by the `configServerUrl` option. 2. If the `version` is set to `'latest'`, the latest version of the configuration is fetched. Otherwise, the specified version is fetched. -3. **Continuous Polling:** The SDK continuously polls the server using HTTP ETags (`If-None-Match`). When a `200 OK` is received (indicating a change), the `onChange` callback is triggered (if provided), and the process is then terminated to allow for a fresh start with the new configuration. `304 Not Modified` responses are silently ignored. +3. **Continuous Polling:** The SDK continuously polls the server using HTTP ETags (`If-None-Match`). When a `200 OK` is received (indicating a change), the `onChange` callback is triggered (if provided), and the process is then terminated to allow for a fresh start with the new configuration. `304 Not Modified` responses are silently ignored. To prevent cluster-wide traffic spikes (thundering herd), a **randomized jitter of +/- 15%** is automatically applied to each polling cycle. ### Environment Variables diff --git a/src/constants.ts b/src/constants.ts index 0f459d1..8f94983 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -9,3 +9,5 @@ export const SCHEMA_DOMAIN = 'https://mapcolonies.com/'; export const SCHEMAS_PACKAGE_RESOLVED_PATH = require.resolve('@map-colonies/schemas'); export const SCHEMA_BASE_PATH = SCHEMAS_PACKAGE_RESOLVED_PATH.substring(0, SCHEMAS_PACKAGE_RESOLVED_PATH.lastIndexOf(path.sep)); + +export const JITTER_PERCENTAGE = 0.15; diff --git a/src/rollout/ChangeDetector.ts b/src/rollout/ChangeDetector.ts index 4c72cbe..55c850e 100644 --- a/src/rollout/ChangeDetector.ts +++ b/src/rollout/ChangeDetector.ts @@ -1,3 +1,4 @@ +import { JITTER_PERCENTAGE } from '../constants'; import { getRemoteConfig } from '../httpClient'; import { BaseOptions } from '../types'; import { createDebug } from '../utils/debug'; @@ -23,23 +24,45 @@ export class ChangeDetector { } public start(): void { - const interval = this.options.pollIntervalMs; - debug('Starting change detector with interval %d ms', interval); - - this.timer = setInterval(() => { - this.poll().catch((err) => { - debug('Error during polling: %s', (err as Error).message); - }); - }, interval); + debug('Starting change detector'); + this.scheduleNextPoll(); } public stop(): void { if (this.timer) { - clearInterval(this.timer); + clearTimeout(this.timer); this.timer = undefined; } } + private scheduleNextPoll(): void { + if (this.timer) { + clearTimeout(this.timer); + } + const baseInterval = this.options.pollIntervalMs!; + const jitter = baseInterval * JITTER_PERCENTAGE; + // eslint-disable-next-line @typescript-eslint/no-magic-numbers + const randomJitter = (Math.random() * 2 - 1) * jitter; + const nextInterval = baseInterval + randomJitter; + + debug('Scheduling next poll in %d ms', nextInterval); + this.timer = setTimeout(() => { + this.poll() + .catch((err) => { + if (isConfigError(err, 'httpResponseError') || isConfigError(err, 'httpGeneralError')) { + debug('Error during polling: %s', err.message); + } else { + debug('Unknown error during polling: %O', err); + } + }) + .finally(() => { + if (this.timer !== undefined) { + this.scheduleNextPoll(); + } + }); + }, nextInterval); + } + private async poll(): Promise { debug('Polling config %s@%s with etag %s', this.options.configName, this.options.version, this.currentEtag); diff --git a/tests/rollout.spec.ts b/tests/rollout.spec.ts index c6a0276..4439b6c 100644 --- a/tests/rollout.spec.ts +++ b/tests/rollout.spec.ts @@ -2,7 +2,9 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { Interceptable, MockAgent, setGlobalDispatcher } from 'undici'; import { commonDbPartialV1 } from '@map-colonies/schemas'; import { StatusCodes } from 'http-status-codes'; +import { random } from 'lodash'; import { config } from '../src/config'; +import { JITTER_PERCENTAGE } from '../src/constants'; import { createMockConfigData } from './mocks'; const URL = 'http://localhost:8080'; @@ -61,7 +63,7 @@ describe('Continuous Polling (ChangeDetector)', () => { .reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } }); // Act (Wait for Poll) - await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL); + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2); // Assert (Updated State) expect(onChangeMock).toHaveBeenCalled(); @@ -102,13 +104,13 @@ describe('Continuous Polling (ChangeDetector)', () => { .reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } }); // Act (Wait for First Poll) - await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL); + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2); // Assert (First change triggered) expect(onChangeMock).toHaveBeenCalledTimes(1); // Act (Advance time to see if polling continues) - await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL); + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2); // Assert (Polling stopped, no further calls) expect(onChangeMock).toHaveBeenCalledTimes(1); @@ -150,7 +152,7 @@ describe('Continuous Polling (ChangeDetector)', () => { .reply(StatusCodes.OK, newConfigData1, { headers: { etag: 'etag-2' } }); // Act (Wait for First Poll) - await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL); + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2); // Assert (First change triggered) expect(onChangeMock).toHaveBeenCalledTimes(1); @@ -165,7 +167,7 @@ describe('Continuous Polling (ChangeDetector)', () => { .reply(StatusCodes.OK, newConfigData2, { headers: { etag: 'etag-3' } }); // Act (Wait for Second Poll) - await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL); + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2); // Assert (Second change triggered) expect(onChangeMock).toHaveBeenCalledTimes(2); @@ -201,7 +203,7 @@ describe('Continuous Polling (ChangeDetector)', () => { .reply(StatusCodes.NOT_MODIFIED); // Act (Wait for Poll) - await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL); + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2); // Assert expect(onChangeMock).not.toHaveBeenCalled(); @@ -236,4 +238,55 @@ describe('Continuous Polling (ChangeDetector)', () => { // Assert expect(onChangeMock).not.toHaveBeenCalled(); }); + + it('should apply randomized jitter within boundaries over 10 cycles', async () => { + // Arrange + const initialConfigData = createMockConfigData(); + + client + .intercept({ path: '/capabilities', method: 'GET' }) + .reply(StatusCodes.OK, { serverVersion: '2.0.0', schemasPackageVersion: '99.9.9', pubSubEnabled: false }); + client + .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) + .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); + + const cycles = random(1, 10); // Random number of cycles to test jitter + client + .intercept({ + path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, + method: 'GET', + }) + .reply(StatusCodes.NOT_MODIFIED) + .times(cycles); + + const setTimeoutSpy = vi.spyOn(global, 'setTimeout'); + + // Act + await config({ + configName: 'name', + version: 1, + schema: commonDbPartialV1, + localConfigPath: './tests/config', + onChange: onChangeMock, + }); + + const maxJitter = DEFAULT_POLL_INTERVAL * JITTER_PERCENTAGE; + const minWait = DEFAULT_POLL_INTERVAL - maxJitter; + const maxWait = DEFAULT_POLL_INTERVAL + maxJitter; + + // Run the randomized number of cycles + for (let i = 0; i < cycles; i++) { + // Advance timers by maxWait to definitely trigger the next poll + await vi.advanceTimersByTimeAsync(maxWait + 1); + } + + // Assert + const pollTimeouts = setTimeoutSpy.mock.calls.map((call) => call[1] as number).filter((time) => time >= minWait && time <= maxWait); + + expect(pollTimeouts.length).toBeGreaterThanOrEqual(cycles); + pollTimeouts.forEach((time) => { + expect(time).toBeGreaterThanOrEqual(minWait); + expect(time).toBeLessThanOrEqual(maxWait); + }); + }); });