diff --git a/README.md b/README.md index 177a965..0cd5dcc 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,8 @@ const configInstance = await config({ pollIntervalMs: 30000, onChange: () => { console.log('Configuration changed! The process will now restart...'); + // Note: The SDK will forcefully terminate the process + // immediately after this callback completes to trigger a fresh restart. } }); @@ -54,7 +56,7 @@ The `ConfigInstance` interface represents your way to interact with the configur ##### `getAll(): T` -- **Description**: Retrieves the entire configuration object. +- **Description**: Retrieves the entire configuration object. If hot-reloading is active, this returns the **most recent** configuration state. - **Returns**: The entire configuration object. ##### `getConfigParts(): { localConfig: object; config: object; envConfig: object }` @@ -143,6 +145,34 @@ This package allows you to configure various options for loading and managing co - **Description**: The polling interval in milliseconds for hot-reloading. - **Environment Variable**: `CONFIG_POLL_INTERVAL_MS` +### `rolloutKey` +- **Type**: `string` +- **Optional**: `true` +- **Default**: `PACKAGE_NAME` +- **Description**: The key used for the distributed lock (opaque identifier for the resource). +- **Environment Variable**: `CONFIG_ROLLOUT_KEY` + +### `callerId` +- **Type**: `string` +- **Optional**: `true` +- **Default**: `os.hostname()` +- **Description**: The unique ID of the instance holding the lock. +- **Environment Variable**: `CONFIG_CALLER_ID` + +### `rolloutLimit` +- **Type**: `number` +- **Optional**: `true` +- **Default**: `1` +- **Description**: The maximum number of concurrent rollouts allowed. +- **Environment Variable**: `CONFIG_ROLLOUT_LIMIT` + +### `lockTtlSeconds` +- **Type**: `number` +- **Optional**: `true` +- **Default**: `20` +- **Description**: The time-to-live for the lock in seconds. +- **Environment Variable**: `CONFIG_LOCK_TTL_SECONDS` + ### `onChange` - **Type**: `(config: T) => void | Promise` - **Optional**: `true` @@ -159,6 +189,10 @@ The following environment variables can be used to configure the options: - `CONFIG_IGNORE_SERVER_IS_OLDER_VERSION_ERROR`: Sets the `ignoreServerIsOlderVersionError` option. - `CONFIG_POLL_INTERVAL_MS`: Sets the `pollIntervalMs` option. - `CONFIG_DISABLE_HOT_RELOAD`: Sets the `disableHotReload` option. +- `CONFIG_ROLLOUT_KEY`: Sets the `rolloutKey` option. +- `CONFIG_CALLER_ID`: Sets the `callerId` option. +- `CONFIG_ROLLOUT_LIMIT`: Sets the `rolloutLimit` option. +- `CONFIG_LOCK_TTL_SECONDS`: Sets the `lockTtlSeconds` option. ## Configuration Merging and Validation @@ -173,6 +207,10 @@ The package supports merging configurations from multiple sources (local, remote 1. The remote configuration is fetched from the server specified by the `configServerUrl` option. 2. If the `version` is set to `'latest'`, the latest version of the configuration is fetched. Otherwise, the specified version is fetched. 3. **Continuous Polling:** The SDK continuously polls the server using HTTP ETags (`If-None-Match`). When a `200 OK` is received (indicating a change), the `onChange` callback is triggered (if provided), and the process is then terminated to allow for a fresh start with the new configuration. `304 Not Modified` responses are silently ignored. To prevent cluster-wide traffic spikes (thundering herd), a **randomized jitter of +/- 15%** is automatically applied to each polling cycle. +4. **Distributed Semaphore Locking:** To control rollout concurrency across a cluster, the SDK implements a distributed locking mechanism using four parameters: `key` (opaque identifier), `callerId` (instance ID), `ttl` (lock duration), and `limit` (max concurrent locks). + - **Lock Acquisition:** Before triggering the `onChange` callback during a hot-reload, the SDK attempts to acquire a lock from the configuration server. + - **Lock Release:** The lock is automatically released after the `onChange` callback completes (whether it succeeds or throws). + - **423 Locked & Retry-After:** If the server returns `423 Locked`, it indicates the rollout limit has been reached. The SDK will respect the `Retry-After` header provided by the server, waiting for the specified duration before attempting to acquire the lock again. ### Environment Variables @@ -188,10 +226,11 @@ If the value of the `x-env-format` key is `json`, the environment variable value - Local configuration 2. If a configuration option is specified in multiple sources, the value from the source with higher precedence (as listed above) is used. +3. When a hot-reload occurs, the **Remote configuration** part is updated, and the merge is re-calculated, ensuring environment variables still win. ### Validation -1. After merging, the final configuration is validated against the defined schema using ajv. +1. After merging (and after every hot-reload), the final configuration is validated against the defined schema using ajv. 2. The validation ensures that all required properties are present, and the types and values of properties conform to the schema. 3. Any default value according to the schema is added to the final object. 4. If the validation fails, an error is thrown (for initial boot). diff --git a/package-lock.json b/package-lock.json index 643a156..eb5b61f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -51,7 +51,7 @@ "node": ">=24.0.0" }, "peerDependencies": { - "@map-colonies/schemas": "^1.9.0", + "@map-colonies/schemas": "^1.22.0", "prom-client": "^15.0.0" }, "peerDependenciesMeta": { @@ -1059,9 +1059,9 @@ } }, "node_modules/@map-colonies/schemas": { - "version": "1.9.0", - "resolved": "https://registry.npmjs.org/@map-colonies/schemas/-/schemas-1.9.0.tgz", - "integrity": "sha512-Xr7bWXfUAQNUAap8U4BnbGgOjW2LwMLd5JTCxIgZ9S4Vussw0ZMvT6rh5r2jIg6dcpu3w2JKZf32vlS7laK4Ug==", + "version": "1.22.0", + "resolved": "https://registry.npmjs.org/@map-colonies/schemas/-/schemas-1.22.0.tgz", + "integrity": "sha512-yieLkagKcCSHiAy3/V6OC+Zo8ougtAVuyP2poFGFJFAUXH17yu8CgxHXTARVuQCKCXxcH/Rem8QWfW3EdTggPA==", "license": "MIT", "peer": true }, diff --git a/package.json b/package.json index ce25e98..395e9c9 100644 --- a/package.json +++ b/package.json @@ -65,7 +65,7 @@ "undici": "^7.3.0" }, "peerDependencies": { - "@map-colonies/schemas": "^1.9.0", + "@map-colonies/schemas": "^1.22.0", "prom-client": "^15.0.0" }, "peerDependenciesMeta": { diff --git a/src/config.ts b/src/config.ts index bca0bcb..9e14551 100644 --- a/src/config.ts +++ b/src/config.ts @@ -16,6 +16,7 @@ import { createConfigError } from './errors'; import { initializeMetrics as initializeMetricsInternal } from './metrics'; import { deepFreeze } from './utils/helpers'; import { ChangeDetector } from './rollout/ChangeDetector'; +import { LockCoordinator } from './rollout/LockCoordinator'; const debug = createDebug('config'); @@ -30,7 +31,7 @@ const semverSatisfies = '2.x'; * polling mechanism. When a configuration change is detected, the SDK * will execute the `onChange` callback (if provided). * Depending on the `terminatePod` option (default `true`), it will either - * stop polling and wait for the user to terminate the process, or continue polling. + * stop polling and wait for the user to terminate the process, or release the lock and continue polling. * * @template T - The type of the configuration schema. * @param {ConfigOptions} options - The options for retrieving the configuration. @@ -132,7 +133,9 @@ export async function config( // Setup polling if (!disableHotReload) { - changeDetector = new ChangeDetector(baseSchema.$id, initOptions, currentEtag, onChange); + const lockCoordinator = new LockCoordinator(initOptions); + await lockCoordinator.release(); + changeDetector = new ChangeDetector(baseSchema.$id, initOptions, currentEtag, lockCoordinator, onChange); changeDetector.start(); } } else { diff --git a/src/httpClient.ts b/src/httpClient.ts index e3f5053..67f6883 100644 --- a/src/httpClient.ts +++ b/src/httpClient.ts @@ -76,3 +76,59 @@ export async function getServerCapabilities(): Promise { debug('Server capabilities fetched successfully'); return (await body.json()) as ServerCapabilities; } + +export async function acquireLock(key: string, callerId: string, limit: number, ttl: number): Promise<{ acquired: boolean; retryAfter?: number }> { + debug('Acquiring lock for key %s (caller: %s) with limit %d and ttl %d', key, callerId, limit, ttl); + const { configServerUrl } = getOptions(); + const url = `${configServerUrl}/locks`; + + const res = await request(url, { + method: 'POST', + body: JSON.stringify({ key, callerId, limit, ttl }), + headers: { 'Content-Type': 'application/json' }, + }); + + if (res.statusCode === statusCodes.OK) { + debug('Lock acquired successfully'); + return { acquired: true }; + } + + if (res.statusCode === statusCodes.LOCKED) { + const retryAfterHeader = res.headers['retry-after']; + const retryAfter = retryAfterHeader !== undefined ? parseInt(retryAfterHeader as string, 10) : undefined; + debug('Lock is already held. Retry-after: %d', retryAfter); + return { acquired: false, retryAfter }; + } + + if (res.statusCode === statusCodes.BAD_REQUEST) { + debug('Failed to acquire lock. Bad request'); + throw createConfigError('httpResponseError', 'Failed to acquire lock', await createHttpErrorPayload(res)); + } + + debug('Unexpected status code while acquiring lock: %d', res.statusCode); + return { acquired: false }; +} + +export async function releaseLock(key: string, callerId: string): Promise { + debug('Releasing lock for key %s (caller: %s)', key, callerId); + const { configServerUrl } = getOptions(); + const url = `${configServerUrl}/locks/${key}/${callerId}`; + + try { + const res = await request(url, { method: 'DELETE' }); + + if (res.statusCode === statusCodes.NO_CONTENT) { + debug('Lock released successfully'); + return; + } + + if (res.statusCode === statusCodes.BAD_REQUEST) { + debug('Failed to release lock. Bad request'); + throw createConfigError('httpResponseError', 'Failed to release lock', await createHttpErrorPayload(res)); + } + + debug('Unexpected status code while releasing lock: %d', res.statusCode); + } catch (error) { + debug('Error during best-effort lock release (swallowed): %s', (error as Error).message); + } +} diff --git a/src/options.ts b/src/options.ts index 4fda7ca..1cc1de2 100644 --- a/src/options.ts +++ b/src/options.ts @@ -1,3 +1,4 @@ +import { hostname } from 'os'; import deepmerge from 'deepmerge'; import { BaseOptions, optionsSchema } from './types'; import { ajvOptionsValidator, validate } from './validator'; @@ -16,6 +17,10 @@ const defaultOptions: BaseOptions = { ignoreServerIsOlderVersionError: false, localConfigPath: './config', disableHotReload: false, + rolloutKey: PACKAGE_NAME, + rolloutLimit: 1, + lockTtlSeconds: 120, + callerId: hostname(), terminatePod: true, }; @@ -27,6 +32,10 @@ const envOptions: Partial> = { ignoreServerIsOlderVersionError: process.env.CONFIG_IGNORE_SERVER_IS_OLDER_VERSION_ERROR, pollIntervalMs: process.env.CONFIG_POLL_INTERVAL_MS, disableHotReload: process.env.CONFIG_DISABLE_HOT_RELOAD, + rolloutKey: process.env.CONFIG_ROLLOUT_KEY, + rolloutLimit: process.env.CONFIG_ROLLOUT_LIMIT, + lockTtlSeconds: process.env.CONFIG_LOCK_TTL_SECONDS, + callerId: process.env.CONFIG_CALLER_ID, terminatePod: process.env.CONFIG_TERMINATE_POD, }; diff --git a/src/rollout/ChangeDetector.ts b/src/rollout/ChangeDetector.ts index 8cc3fd3..fe48e40 100644 --- a/src/rollout/ChangeDetector.ts +++ b/src/rollout/ChangeDetector.ts @@ -3,6 +3,7 @@ import { getRemoteConfig } from '../httpClient'; import { BaseOptions } from '../types'; import { createDebug } from '../utils/debug'; import { isConfigError } from '../errors'; +import { LockCoordinator } from './LockCoordinator'; const debug = createDebug('changeDetector'); @@ -17,7 +18,8 @@ export class ChangeDetector { public constructor( private readonly schemaId: string, private readonly options: BaseOptions, - initialEtag: string, + private readonly initialEtag: string, + private readonly lockCoordinator: LockCoordinator, private readonly onConfigUpdate?: () => void | Promise ) { this.currentEtag = initialEtag; @@ -75,6 +77,7 @@ export class ChangeDetector { debug('Config change detected. Stopping polling. New etag: %s', response.etag); this.stop(); + await this.lockCoordinator.acquire(); try { if (this.onConfigUpdate) { await this.onConfigUpdate(); @@ -92,6 +95,7 @@ export class ChangeDetector { this.currentEtag = response.etag; debug('Pod termination is not expected.'); this.start(); + await this.lockCoordinator.release(); } } } diff --git a/src/rollout/LockCoordinator.ts b/src/rollout/LockCoordinator.ts new file mode 100644 index 0000000..019ba24 --- /dev/null +++ b/src/rollout/LockCoordinator.ts @@ -0,0 +1,44 @@ +import { acquireLock, releaseLock } from '../httpClient'; +import { BaseOptions } from '../types'; +import { createDebug } from '../utils/debug'; + +const debug = createDebug('lockCoordinator'); + +/** + * Class responsible for coordinating distributed locks to control rollout concurrency. + */ +export class LockCoordinator { + public constructor(private readonly options: BaseOptions) {} + + /** + * Acquires a distributed lock. If the lock is already held, it waits according to the Retry-After header. + */ + public async acquire(): Promise { + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + while (true) { + const { acquired, retryAfter } = await acquireLock( + this.options.rolloutKey, + this.options.callerId, + this.options.rolloutLimit, + this.options.lockTtlSeconds + ); + + if (acquired) { + return; + } + + // If not acquired, wait for retryAfter (in seconds) or a default value of 1 second + // eslint-disable-next-line @typescript-eslint/no-magic-numbers + const waitTime = retryAfter! * 1000; + debug('Lock not acquired, waiting for %d ms', waitTime); + await new Promise((resolve) => setTimeout(resolve, waitTime)); + } + } + + /** + * Releases the distributed lock. + */ + public async release(): Promise { + await releaseLock(this.options.rolloutKey, this.options.callerId); + } +} diff --git a/src/types.ts b/src/types.ts index ea3163d..aeac89a 100644 --- a/src/types.ts +++ b/src/types.ts @@ -77,8 +77,29 @@ export interface BaseOptions { * @default false */ disableHotReload: boolean; + /** + * The key used for the distributed lock (opaque identifier for the resource). + */ + rolloutKey: string; + /** + * The unique ID of the instance holding the lock. + * @default os.hostname() + */ + callerId: string; + /** + * The maximum number of concurrent rollouts allowed. + * @default 1 + */ + rolloutLimit: number; + /** + * The time-to-live for the lock in seconds. + * @default 20 + */ + lockTtlSeconds: number; /** * Indicates whether the pod will be terminated after an update. + * If true, the SDK will stop polling and not release the lock. + * If false, the SDK will release the lock and continue polling. * @default true */ terminatePod: boolean; @@ -123,6 +144,10 @@ export const optionsSchema: JSONSchemaType = { localConfigPath: { type: 'string', default: './config' }, pollIntervalMs: { type: 'integer', default: 30000 }, disableHotReload: { type: 'boolean', default: false }, + rolloutKey: { type: 'string' }, + callerId: { type: 'string' }, + rolloutLimit: { type: 'integer', minimum: 1, default: 1 }, + lockTtlSeconds: { type: 'integer', minimum: 1, default: 20 }, terminatePod: { type: 'boolean', default: true }, }, }; @@ -134,6 +159,7 @@ export const optionsSchema: JSONSchemaType = { export interface ConfigInstance { /** * Retrieves the value at the specified path from the configuration object. + * If hot-reloading is active, this returns the value from the most recent configuration update. * @template TPath - The type of the path. * @param path - The path to the desired value. * @returns The value at the specified path. @@ -142,12 +168,14 @@ export interface ConfigInstance { /** * Retrieves the entire configuration object. + * If hot-reloading is active, this returns the most recent configuration state. * @returns The entire configuration object. */ getAll: () => T; /** * Retrieves different parts of the configuration object before being merged and validated. + * If hot-reloading is active, 'config' reflects the latest remote payload. * @returns An object containing the localConfig, config, and envConfig parts of the configuration. */ getConfigParts: () => { diff --git a/tests/config.spec.ts b/tests/config.spec.ts index 753bab8..7d10fab 100644 --- a/tests/config.spec.ts +++ b/tests/config.spec.ts @@ -1,8 +1,10 @@ +import { hostname } from 'node:os'; import { beforeEach, describe, expect, it, vi } from 'vitest'; import { Interceptable, MockAgent, setGlobalDispatcher } from 'undici'; import { commonDbPartialV1, commonS3PartialV1 } from '@map-colonies/schemas'; import { StatusCodes } from 'http-status-codes'; import { config } from '../src/config'; +import { PACKAGE_NAME } from '../src/constants'; import { createMockConfigData } from './mocks'; const URL = 'http://localhost:8080'; @@ -217,6 +219,10 @@ describe('config', () => { pollIntervalMs: 3000, disableHotReload: false, ignoreServerIsOlderVersionError: false, + lockTtlSeconds: 120, + rolloutKey: PACKAGE_NAME, + callerId: hostname(), + rolloutLimit: 1, terminatePod: true, }); }); diff --git a/tests/lock.spec.ts b/tests/lock.spec.ts new file mode 100644 index 0000000..4893a65 --- /dev/null +++ b/tests/lock.spec.ts @@ -0,0 +1,220 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { Interceptable, MockAgent, setGlobalDispatcher } from 'undici'; +import { commonDbPartialV1 } from '@map-colonies/schemas'; +import { StatusCodes } from 'http-status-codes'; +import { config } from '../src/config'; +import { JITTER_PERCENTAGE } from '../src/constants'; +import * as httpClient from '../src/httpClient'; +import { createMockConfigData } from './mocks'; + +const URL = 'http://localhost:8080'; +const DEFAULT_POLL_INTERVAL = 30000; + +describe('Distributed Semaphore Locking', () => { + let client: Interceptable; + const onChangeMock = vi.fn(); + + beforeEach(() => { + vi.useFakeTimers(); + const agent = new MockAgent(); + agent.disableNetConnect(); + + setGlobalDispatcher(agent); + client = agent.get(URL); + + // Add default mock for lock release on startup + client + .intercept({ path: /\/locks\/.*/, method: 'DELETE' }) + .reply(StatusCodes.NO_CONTENT) + .persist(); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + it('when terminatePod is false, should acquire lock before onChange and release it after', async () => { + // Arrange + const initialConfigData = createMockConfigData(); + const newConfigData = createMockConfigData({ config: { host: 'updated-host' }, createdAt: 1 }); + + // Cold-start requests + client + .intercept({ path: '/capabilities', method: 'GET' }) + .reply(StatusCodes.OK, { serverVersion: '2.0.0', schemasPackageVersion: '99.9.9', pubSubEnabled: false }); + client + .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) + .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); + + // Act + await config({ + configName: 'name', + version: 1, + schema: commonDbPartialV1, + localConfigPath: './tests/config', + onChange: onChangeMock, + rolloutKey: 'my-lock', + callerId: 'my-caller', + terminatePod: false, + }); + + // Arrange (Hot-reload triggers) + client + .intercept({ + path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, + method: 'GET', + headers: { 'if-none-match': 'etag-1' }, + }) + .reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } }); + + // Mock Lock Acquisition + client + .intercept({ + path: '/locks', + method: 'POST', + body: JSON.stringify({ key: 'my-lock', callerId: 'my-caller', limit: 1, ttl: 120 }), + }) + .reply(StatusCodes.OK); + + // Mock Lock Release + client.intercept({ path: '/locks/my-lock/my-caller', method: 'DELETE' }).reply(StatusCodes.NO_CONTENT); + + // Act (Wait for Poll) + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); + await vi.waitFor(() => expect(onChangeMock).toHaveBeenCalled()); + + // Assert + expect(onChangeMock).toHaveBeenCalled(); + }); + + it('when terminatePod is true, should acquire lock before onChange and not release it after', async () => { + // Arrange + const initialConfigData = createMockConfigData(); + const newConfigData = createMockConfigData({ config: { host: 'updated-host' }, createdAt: 1 }); + + // Cold-start requests + client + .intercept({ path: '/capabilities', method: 'GET' }) + .reply(StatusCodes.OK, { serverVersion: '2.0.0', schemasPackageVersion: '99.9.9', pubSubEnabled: false }); + client + .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) + .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); + + const releaseSpy = vi.spyOn(httpClient, 'releaseLock'); + + // Act + await config({ + configName: 'name', + version: 1, + schema: commonDbPartialV1, + localConfigPath: './tests/config', + onChange: onChangeMock, + rolloutKey: 'my-lock', + callerId: 'my-caller', + terminatePod: true, + }); + + // Arrange (Hot-reload triggers) + client + .intercept({ + path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, + method: 'GET', + headers: { 'if-none-match': 'etag-1' }, + }) + .reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } }); + + // Mock Lock Acquisition + client + .intercept({ + path: '/locks', + method: 'POST', + body: JSON.stringify({ key: 'my-lock', callerId: 'my-caller', limit: 1, ttl: 120 }), + }) + .reply(StatusCodes.OK); + + // Act (Wait for Poll) + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); + await vi.waitFor(() => expect(onChangeMock).toHaveBeenCalled()); + + // Assert + expect(onChangeMock).toHaveBeenCalled(); + // Verify release was only called once (during cold-start) + expect(releaseSpy).toHaveBeenCalledTimes(1); + }); + + it('should release the lock during initial cold-start', async () => { + // Arrange + const initialConfigData = createMockConfigData({ config: { host: 'initial-host' } }); + + client + .intercept({ path: '/capabilities', method: 'GET' }) + .reply(StatusCodes.OK, { serverVersion: '2.0.0', schemasPackageVersion: '99.9.9', pubSubEnabled: false }); + client + .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) + .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); + + const releaseSpy = vi.spyOn(httpClient, 'releaseLock'); + + // Act + const configInstance = await config({ + configName: 'name', + version: 1, + schema: commonDbPartialV1, + localConfigPath: './tests/config', + onChange: onChangeMock, + rolloutKey: 'my-lock', + callerId: 'my-caller', + }); + + // Assert + expect(configInstance.get('host')).toBe('initial-host'); + expect(releaseSpy).toHaveBeenCalledWith('my-lock', 'my-caller'); + }); + + it('should wait and retry if lock acquisition returns 423 Locked with Retry-After', async () => { + // Arrange + const initialConfigData = createMockConfigData({ config: { host: 'initial-host' } }); + const newConfigData = createMockConfigData({ config: { host: 'updated-host' }, createdAt: 1 }); + + client + .intercept({ path: '/capabilities', method: 'GET' }) + .reply(StatusCodes.OK, { serverVersion: '2.0.0', schemasPackageVersion: '99.9.9', pubSubEnabled: false }); + client + .intercept({ path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, method: 'GET' }) + .reply(StatusCodes.OK, initialConfigData, { headers: { etag: 'etag-1' } }); + + await config({ + configName: 'name', + version: 1, + schema: commonDbPartialV1, + localConfigPath: './tests/config', + onChange: onChangeMock, + rolloutKey: 'my-lock', + callerId: 'my-caller', + }); + + // Arrange (Hot-reload triggers) + client + .intercept({ + path: `/config/name/1?shouldDereference=true&schemaId=${commonDbPartialV1.$id}`, + method: 'GET', + }) + .reply(StatusCodes.OK, newConfigData, { headers: { etag: 'etag-2' } }); + + // Mock first lock attempt failing with 423 + client.intercept({ path: '/locks', method: 'POST' }).reply(StatusCodes.LOCKED, {}, { headers: { 'retry-after': '2' } }); + // Mock second lock attempt succeeding + client.intercept({ path: '/locks', method: 'POST' }).reply(StatusCodes.OK); + client.intercept({ path: '/locks/my-lock/my-caller', method: 'DELETE' }).reply(StatusCodes.NO_CONTENT); + + // Act (Wait for Poll) + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); + + // Wait for the retry interval (2 seconds) + await vi.advanceTimersByTimeAsync(2001); + await vi.waitFor(() => expect(onChangeMock).toHaveBeenCalled()); + + // Assert + expect(onChangeMock).toHaveBeenCalled(); + }); +}); diff --git a/tests/rollout.spec.ts b/tests/rollout.spec.ts index 04c0ff3..61ff3f6 100644 --- a/tests/rollout.spec.ts +++ b/tests/rollout.spec.ts @@ -21,11 +21,19 @@ describe('Continuous Polling (ChangeDetector)', () => { setGlobalDispatcher(agent); client = agent.get(URL); + + client + .intercept({ path: /\/locks.*/, method: 'POST' }) + .reply(StatusCodes.OK) + .persist(); + client + .intercept({ path: /\/locks.*/, method: 'DELETE' }) + .reply(StatusCodes.NO_CONTENT) + .persist(); }); afterEach(() => { - vi.restoreAllMocks(); - onChangeMock.mockReset(); + vi.clearAllMocks(); }); it('should trigger onChange when polling returns a new config (200 OK)', async () => { @@ -203,7 +211,7 @@ describe('Continuous Polling (ChangeDetector)', () => { .reply(StatusCodes.NOT_MODIFIED); // Act (Wait for Poll) - await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * 2); + await vi.advanceTimersByTimeAsync(DEFAULT_POLL_INTERVAL * (1 + JITTER_PERCENTAGE)); // Assert expect(onChangeMock).not.toHaveBeenCalled();