Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const configInstance = await config({
pollIntervalMs: 30000,
onChange: () => {
console.log('Configuration changed! The process will now restart...');
// Note: The SDK will forcefully terminate the process
// immediately after this callback completes to trigger a fresh restart.
}
});

Expand All @@ -54,7 +56,7 @@ The `ConfigInstance` interface represents your way to interact with the configur

##### `getAll(): T`

- **Description**: Retrieves the entire configuration object.
- **Description**: Retrieves the entire configuration object. If hot-reloading is active, this returns the **most recent** configuration state.
- **Returns**: The entire configuration object.

##### `getConfigParts(): { localConfig: object; config: object; envConfig: object }`
Expand Down Expand Up @@ -143,6 +145,34 @@ This package allows you to configure various options for loading and managing co
- **Description**: The polling interval in milliseconds for hot-reloading.
- **Environment Variable**: `CONFIG_POLL_INTERVAL_MS`

### `rolloutKey`
- **Type**: `string`
- **Optional**: `true`
- **Default**: `PACKAGE_NAME`
- **Description**: The key used for the distributed lock (opaque identifier for the resource).
- **Environment Variable**: `CONFIG_ROLLOUT_KEY`

### `callerId`
- **Type**: `string`
- **Optional**: `true`
- **Default**: `os.hostname()`
- **Description**: The unique ID of the instance holding the lock.
- **Environment Variable**: `CONFIG_CALLER_ID`

### `rolloutLimit`
- **Type**: `number`
- **Optional**: `true`
- **Default**: `1`
- **Description**: The maximum number of concurrent rollouts allowed.
- **Environment Variable**: `CONFIG_ROLLOUT_LIMIT`

### `lockTtlSeconds`
- **Type**: `number`
- **Optional**: `true`
- **Default**: `20`
- **Description**: The time-to-live for the lock in seconds.
- **Environment Variable**: `CONFIG_LOCK_TTL_SECONDS`

### `onChange`
- **Type**: `(config: T) => void | Promise<void>`
- **Optional**: `true`
Expand All @@ -159,6 +189,10 @@ The following environment variables can be used to configure the options:
- `CONFIG_IGNORE_SERVER_IS_OLDER_VERSION_ERROR`: Sets the `ignoreServerIsOlderVersionError` option.
- `CONFIG_POLL_INTERVAL_MS`: Sets the `pollIntervalMs` option.
- `CONFIG_DISABLE_HOT_RELOAD`: Sets the `disableHotReload` option.
- `CONFIG_ROLLOUT_KEY`: Sets the `rolloutKey` option.
- `CONFIG_CALLER_ID`: Sets the `callerId` option.
- `CONFIG_ROLLOUT_LIMIT`: Sets the `rolloutLimit` option.
- `CONFIG_LOCK_TTL_SECONDS`: Sets the `lockTtlSeconds` option.

## Configuration Merging and Validation

Expand All @@ -173,6 +207,10 @@ The package supports merging configurations from multiple sources (local, remote
1. The remote configuration is fetched from the server specified by the `configServerUrl` option.
2. If the `version` is set to `'latest'`, the latest version of the configuration is fetched. Otherwise, the specified version is fetched.
3. **Continuous Polling:** The SDK continuously polls the server using HTTP ETags (`If-None-Match`). When a `200 OK` is received (indicating a change), the `onChange` callback is triggered (if provided), and the process is then terminated to allow for a fresh start with the new configuration. `304 Not Modified` responses are silently ignored. To prevent cluster-wide traffic spikes (thundering herd), a **randomized jitter of +/- 15%** is automatically applied to each polling cycle.
4. **Distributed Semaphore Locking:** To control rollout concurrency across a cluster, the SDK implements a distributed locking mechanism using four parameters: `key` (opaque identifier), `callerId` (instance ID), `ttl` (lock duration), and `limit` (max concurrent locks).
- **Lock Acquisition:** Before triggering the `onChange` callback during a hot-reload, the SDK attempts to acquire a lock from the configuration server.
- **Lock Release:** The lock is automatically released after the `onChange` callback completes (whether it succeeds or throws).
- **423 Locked & Retry-After:** If the server returns `423 Locked`, it indicates the rollout limit has been reached. The SDK will respect the `Retry-After` header provided by the server, waiting for the specified duration before attempting to acquire the lock again.

### Environment Variables

Expand All @@ -188,10 +226,11 @@ If the value of the `x-env-format` key is `json`, the environment variable value
- Local configuration

2. If a configuration option is specified in multiple sources, the value from the source with higher precedence (as listed above) is used.
3. When a hot-reload occurs, the **Remote configuration** part is updated, and the merge is re-calculated, ensuring environment variables still win.

### Validation

1. After merging, the final configuration is validated against the defined schema using ajv.
1. After merging (and after every hot-reload), the final configuration is validated against the defined schema using ajv.
2. The validation ensures that all required properties are present, and the types and values of properties conform to the schema.
3. Any default value according to the schema is added to the final object.
4. If the validation fails, an error is thrown (for initial boot).
Expand Down
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
"undici": "^7.3.0"
},
"peerDependencies": {
"@map-colonies/schemas": "^1.9.0",
"@map-colonies/schemas": "^1.22.0",
"prom-client": "^15.0.0"
},
"peerDependenciesMeta": {
Expand Down
7 changes: 5 additions & 2 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { createConfigError } from './errors';
import { initializeMetrics as initializeMetricsInternal } from './metrics';
import { deepFreeze } from './utils/helpers';
import { ChangeDetector } from './rollout/ChangeDetector';
import { LockCoordinator } from './rollout/LockCoordinator';

const debug = createDebug('config');

Expand All @@ -30,7 +31,7 @@ const semverSatisfies = '2.x';
* polling mechanism. When a configuration change is detected, the SDK
* will execute the `onChange` callback (if provided).
* Depending on the `terminatePod` option (default `true`), it will either
* stop polling and wait for the user to terminate the process, or continue polling.
* stop polling and wait for the user to terminate the process, or release the lock and continue polling.
*
* @template T - The type of the configuration schema.
* @param {ConfigOptions<T>} options - The options for retrieving the configuration.
Expand Down Expand Up @@ -132,7 +133,9 @@ export async function config<T extends { [typeSymbol]: unknown; $id: string }>(

// Setup polling
if (!disableHotReload) {
changeDetector = new ChangeDetector(baseSchema.$id, initOptions, currentEtag, onChange);
const lockCoordinator = new LockCoordinator(initOptions);
await lockCoordinator.release();
changeDetector = new ChangeDetector(baseSchema.$id, initOptions, currentEtag, lockCoordinator, onChange);
changeDetector.start();
}
} else {
Expand Down
56 changes: 56 additions & 0 deletions src/httpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,59 @@ export async function getServerCapabilities(): Promise<ServerCapabilities> {
debug('Server capabilities fetched successfully');
return (await body.json()) as ServerCapabilities;
}

export async function acquireLock(key: string, callerId: string, limit: number, ttl: number): Promise<{ acquired: boolean; retryAfter?: number }> {
debug('Acquiring lock for key %s (caller: %s) with limit %d and ttl %d', key, callerId, limit, ttl);
const { configServerUrl } = getOptions();
const url = `${configServerUrl}/locks`;

const res = await request(url, {
method: 'POST',
body: JSON.stringify({ key, callerId, limit, ttl }),
headers: { 'Content-Type': 'application/json' },
});

if (res.statusCode === statusCodes.OK) {
debug('Lock acquired successfully');
return { acquired: true };
}

if (res.statusCode === statusCodes.LOCKED) {
const retryAfterHeader = res.headers['retry-after'];
const retryAfter = retryAfterHeader !== undefined ? parseInt(retryAfterHeader as string, 10) : undefined;
debug('Lock is already held. Retry-after: %d', retryAfter);
return { acquired: false, retryAfter };
}

if (res.statusCode === statusCodes.BAD_REQUEST) {
debug('Failed to acquire lock. Bad request');
throw createConfigError('httpResponseError', 'Failed to acquire lock', await createHttpErrorPayload(res));
}

debug('Unexpected status code while acquiring lock: %d', res.statusCode);
return { acquired: false };
}

export async function releaseLock(key: string, callerId: string): Promise<void> {
debug('Releasing lock for key %s (caller: %s)', key, callerId);
const { configServerUrl } = getOptions();
const url = `${configServerUrl}/locks/${key}/${callerId}`;

try {
const res = await request(url, { method: 'DELETE' });

if (res.statusCode === statusCodes.NO_CONTENT) {
debug('Lock released successfully');
return;
}

if (res.statusCode === statusCodes.BAD_REQUEST) {
debug('Failed to release lock. Bad request');
throw createConfigError('httpResponseError', 'Failed to release lock', await createHttpErrorPayload(res));
}

debug('Unexpected status code while releasing lock: %d', res.statusCode);
} catch (error) {
debug('Error during best-effort lock release (swallowed): %s', (error as Error).message);
}
}
9 changes: 9 additions & 0 deletions src/options.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { hostname } from 'os';
import deepmerge from 'deepmerge';
import { BaseOptions, optionsSchema } from './types';
import { ajvOptionsValidator, validate } from './validator';
Expand All @@ -16,6 +17,10 @@ const defaultOptions: BaseOptions = {
ignoreServerIsOlderVersionError: false,
localConfigPath: './config',
disableHotReload: false,
rolloutKey: PACKAGE_NAME,
rolloutLimit: 1,
lockTtlSeconds: 120,
callerId: hostname(),
terminatePod: true,
};

Expand All @@ -27,6 +32,10 @@ const envOptions: Partial<Record<keyof BaseOptions, string>> = {
ignoreServerIsOlderVersionError: process.env.CONFIG_IGNORE_SERVER_IS_OLDER_VERSION_ERROR,
pollIntervalMs: process.env.CONFIG_POLL_INTERVAL_MS,
disableHotReload: process.env.CONFIG_DISABLE_HOT_RELOAD,
rolloutKey: process.env.CONFIG_ROLLOUT_KEY,
rolloutLimit: process.env.CONFIG_ROLLOUT_LIMIT,
lockTtlSeconds: process.env.CONFIG_LOCK_TTL_SECONDS,
callerId: process.env.CONFIG_CALLER_ID,
terminatePod: process.env.CONFIG_TERMINATE_POD,
};

Expand Down
6 changes: 5 additions & 1 deletion src/rollout/ChangeDetector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { getRemoteConfig } from '../httpClient';
import { BaseOptions } from '../types';
import { createDebug } from '../utils/debug';
import { isConfigError } from '../errors';
import { LockCoordinator } from './LockCoordinator';

const debug = createDebug('changeDetector');

Expand All @@ -17,7 +18,8 @@ export class ChangeDetector {
public constructor(
private readonly schemaId: string,
private readonly options: BaseOptions,
initialEtag: string,
private readonly initialEtag: string,
private readonly lockCoordinator: LockCoordinator,
private readonly onConfigUpdate?: () => void | Promise<void>
) {
this.currentEtag = initialEtag;
Expand Down Expand Up @@ -75,6 +77,7 @@ export class ChangeDetector {

debug('Config change detected. Stopping polling. New etag: %s', response.etag);
this.stop();
await this.lockCoordinator.acquire();
try {
if (this.onConfigUpdate) {
await this.onConfigUpdate();
Expand All @@ -92,6 +95,7 @@ export class ChangeDetector {
this.currentEtag = response.etag;
debug('Pod termination is not expected.');
this.start();
await this.lockCoordinator.release();
}
}
}
Expand Down
44 changes: 44 additions & 0 deletions src/rollout/LockCoordinator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { acquireLock, releaseLock } from '../httpClient';
import { BaseOptions } from '../types';
import { createDebug } from '../utils/debug';

const debug = createDebug('lockCoordinator');

/**
* Class responsible for coordinating distributed locks to control rollout concurrency.
*/
export class LockCoordinator {
public constructor(private readonly options: BaseOptions) {}

/**
* Acquires a distributed lock. If the lock is already held, it waits according to the Retry-After header.
*/
public async acquire(): Promise<void> {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
while (true) {
const { acquired, retryAfter } = await acquireLock(
this.options.rolloutKey,
this.options.callerId,
this.options.rolloutLimit,
this.options.lockTtlSeconds
);

if (acquired) {
return;
}

// If not acquired, wait for retryAfter (in seconds) or a default value of 1 second
// eslint-disable-next-line @typescript-eslint/no-magic-numbers
const waitTime = retryAfter! * 1000;
debug('Lock not acquired, waiting for %d ms', waitTime);
await new Promise((resolve) => setTimeout(resolve, waitTime));
}
}

/**
* Releases the distributed lock.
*/
public async release(): Promise<void> {
await releaseLock(this.options.rolloutKey, this.options.callerId);
}
}
28 changes: 28 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,29 @@ export interface BaseOptions {
* @default false
*/
disableHotReload: boolean;
/**
* The key used for the distributed lock (opaque identifier for the resource).
*/
rolloutKey: string;
/**
* The unique ID of the instance holding the lock.
* @default os.hostname()
*/
callerId: string;
/**
* The maximum number of concurrent rollouts allowed.
* @default 1
*/
rolloutLimit: number;
/**
* The time-to-live for the lock in seconds.
* @default 20
*/
lockTtlSeconds: number;
/**
* Indicates whether the pod will be terminated after an update.
* If true, the SDK will stop polling and not release the lock.
* If false, the SDK will release the lock and continue polling.
* @default true
*/
terminatePod: boolean;
Expand Down Expand Up @@ -123,6 +144,10 @@ export const optionsSchema: JSONSchemaType<BaseOptions> = {
localConfigPath: { type: 'string', default: './config' },
pollIntervalMs: { type: 'integer', default: 30000 },
disableHotReload: { type: 'boolean', default: false },
rolloutKey: { type: 'string' },
callerId: { type: 'string' },
rolloutLimit: { type: 'integer', minimum: 1, default: 1 },
lockTtlSeconds: { type: 'integer', minimum: 1, default: 20 },
terminatePod: { type: 'boolean', default: true },
},
};
Expand All @@ -134,6 +159,7 @@ export const optionsSchema: JSONSchemaType<BaseOptions> = {
export interface ConfigInstance<T> {
/**
* Retrieves the value at the specified path from the configuration object.
* If hot-reloading is active, this returns the value from the most recent configuration update.
* @template TPath - The type of the path.
* @param path - The path to the desired value.
* @returns The value at the specified path.
Expand All @@ -142,12 +168,14 @@ export interface ConfigInstance<T> {

/**
* Retrieves the entire configuration object.
* If hot-reloading is active, this returns the most recent configuration state.
* @returns The entire configuration object.
*/
getAll: () => T;

/**
* Retrieves different parts of the configuration object before being merged and validated.
* If hot-reloading is active, 'config' reflects the latest remote payload.
* @returns An object containing the localConfig, config, and envConfig parts of the configuration.
*/
getConfigParts: () => {
Expand Down
6 changes: 6 additions & 0 deletions tests/config.spec.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { hostname } from 'node:os';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { Interceptable, MockAgent, setGlobalDispatcher } from 'undici';
import { commonDbPartialV1, commonS3PartialV1 } from '@map-colonies/schemas';
import { StatusCodes } from 'http-status-codes';
import { config } from '../src/config';
import { PACKAGE_NAME } from '../src/constants';
import { createMockConfigData } from './mocks';

const URL = 'http://localhost:8080';
Expand Down Expand Up @@ -217,6 +219,10 @@ describe('config', () => {
pollIntervalMs: 3000,
disableHotReload: false,
ignoreServerIsOlderVersionError: false,
lockTtlSeconds: 120,
rolloutKey: PACKAGE_NAME,
callerId: hostname(),
rolloutLimit: 1,
terminatePod: true,
});
});
Expand Down
Loading
Loading