diff --git a/src/rate-limiting/rate-limiting.module.ts b/src/rate-limiting/rate-limiting.module.ts index 325d1857..109280ab 100644 --- a/src/rate-limiting/rate-limiting.module.ts +++ b/src/rate-limiting/rate-limiting.module.ts @@ -11,6 +11,7 @@ import { QuotaDefinitionService } from './services/quota-definition.service'; import { QuotaTrackingService } from './services/quota-tracking.service'; import { QuotaResetScheduler } from './services/quota-reset.scheduler'; import { AdaptiveRateLimitingService } from './services/adaptive-rate-limiting.service'; +import { ContainerCpuMetricsService } from './services/container-cpu-metrics.service'; // Guard & Decorator import { QuotaGuard } from './guards/quota.guard'; @@ -28,6 +29,7 @@ import { UserQuotaController } from './controllers/user-quota.controller'; QuotaManagementService, QuotaResetScheduler, AdaptiveRateLimitingService, + ContainerCpuMetricsService, QuotaGuard, ], exports: [QuotaManagementService, QuotaDefinitionService, QuotaTrackingService, QuotaGuard], diff --git a/src/rate-limiting/services/adaptive-rate-limiting.service.ts b/src/rate-limiting/services/adaptive-rate-limiting.service.ts index 07a6f84f..8426a72f 100644 --- a/src/rate-limiting/services/adaptive-rate-limiting.service.ts +++ b/src/rate-limiting/services/adaptive-rate-limiting.service.ts @@ -1,20 +1,18 @@ import { Injectable } from '@nestjs/common'; -import * as os from 'os'; +import { ContainerCpuMetricsService } from './container-cpu-metrics.service'; /** * Provides adaptive Rate Limiting operations. */ @Injectable() export class AdaptiveRateLimitingService { + constructor(private readonly cpuMetrics: ContainerCpuMetricsService) {} /** * Retrieves system Load Factor. * @returns The calculated numeric value. */ - getSystemLoadFactor(): number { - const load = os.loadavg()[0]; // 1-minute average - const cpuCount = os.cpus().length; - - const loadPercentage = load / cpuCount; + async getSystemLoadFactor(): Promise { + const loadPercentage = await this.cpuMetrics.getCpuLoadRatio(); if (loadPercentage > 0.9) return 0.5; // reduce limits by 50% if (loadPercentage > 0.7) return 0.7; @@ -26,8 +24,8 @@ export class AdaptiveRateLimitingService { * @param baseLimit The maximum number of results. * @returns The calculated numeric value. */ - adjustLimit(baseLimit: number): number { - const factor = this.getSystemLoadFactor(); + async adjustLimit(baseLimit: number): Promise { + const factor = await this.getSystemLoadFactor(); return Math.floor(baseLimit * factor); } } diff --git a/src/rate-limiting/services/container-cpu-metrics.service.spec.ts b/src/rate-limiting/services/container-cpu-metrics.service.spec.ts new file mode 100644 index 00000000..bb90842c --- /dev/null +++ b/src/rate-limiting/services/container-cpu-metrics.service.spec.ts @@ -0,0 +1,95 @@ +jest.mock('fs', () => ({ + readFileSync: jest.fn(), +})); + +jest.mock('os', () => ({ + cpus: jest.fn(), + loadavg: jest.fn(), +})); + +import { readFileSync } from 'fs'; +import * as os from 'os'; +import { ContainerCpuMetricsService } from './container-cpu-metrics.service'; + +describe('ContainerCpuMetricsService', () => { + const mockedReadFileSync = readFileSync as jest.MockedFunction; + const mockedCpus = os.cpus as jest.MockedFunction; + const mockedLoadavg = os.loadavg as jest.MockedFunction; + + const originalFetch = global.fetch; + const originalPrometheusUrl = process.env.PROMETHEUS_METRICS_URL; + + beforeEach(() => { + jest.clearAllMocks(); + process.env.PROMETHEUS_METRICS_URL = undefined; + }); + + afterEach(() => { + global.fetch = originalFetch; + process.env.PROMETHEUS_METRICS_URL = originalPrometheusUrl; + }); + + it('reads cgroup v2 cpu.stat and returns throttling ratio', async () => { + mockedCpus.mockReturnValue([{ model: 'cpu', speed: 1000, times: {} as never }] as never); + mockedReadFileSync.mockReturnValue( + 'usage_usec 100000\nnr_periods 100\nnr_throttled 80\n' as never, + ); + + const service = new ContainerCpuMetricsService(); + const ratio = await service.getCpuLoadRatio(); + + expect(ratio).toBeCloseTo(0.8, 4); + }); + + it('falls back to os.loadavg when cgroup is unavailable', async () => { + mockedReadFileSync.mockImplementation(() => { + throw new Error('missing'); + }); + mockedLoadavg.mockReturnValue([2, 1, 1]); + mockedCpus.mockReturnValue([ + { model: 'cpu-1', speed: 1000, times: {} as never }, + { model: 'cpu-2', speed: 1000, times: {} as never }, + { model: 'cpu-3', speed: 1000, times: {} as never }, + { model: 'cpu-4', speed: 1000, times: {} as never }, + ] as never); + + const service = new ContainerCpuMetricsService(); + const ratio = await service.getCpuLoadRatio(); + + expect(ratio).toBeCloseTo(0.5, 4); + }); + + it('falls back to Prometheus process_cpu_seconds_total when cgroup is unavailable', async () => { + process.env.PROMETHEUS_METRICS_URL = 'http://127.0.0.1:3000/metrics'; + + mockedReadFileSync.mockImplementation(() => { + throw new Error('missing'); + }); + mockedCpus.mockReturnValue([ + { model: 'cpu-1', speed: 1000, times: {} as never }, + { model: 'cpu-2', speed: 1000, times: {} as never }, + ] as never); + mockedLoadavg.mockReturnValue([0.4, 0, 0]); + + jest.spyOn(Date, 'now').mockReturnValueOnce(1000).mockReturnValueOnce(2000); + + global.fetch = jest + .fn() + .mockResolvedValueOnce({ + ok: true, + text: async () => '# HELP process_cpu_seconds_total\nprocess_cpu_seconds_total 10\n', + } as Response) + .mockResolvedValueOnce({ + ok: true, + text: async () => '# HELP process_cpu_seconds_total\nprocess_cpu_seconds_total 11\n', + } as Response); + + const service = new ContainerCpuMetricsService(); + + const firstRatio = await service.getCpuLoadRatio(); + const secondRatio = await service.getCpuLoadRatio(); + + expect(firstRatio).toBeCloseTo(0.2, 4); + expect(secondRatio).toBeCloseTo(0.5, 4); + }); +}); diff --git a/src/rate-limiting/services/container-cpu-metrics.service.ts b/src/rate-limiting/services/container-cpu-metrics.service.ts new file mode 100644 index 00000000..f72fc6f9 --- /dev/null +++ b/src/rate-limiting/services/container-cpu-metrics.service.ts @@ -0,0 +1,156 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { readFileSync } from 'fs'; +import * as os from 'os'; + +interface CpuStatSnapshot { + usageUsec: number; + nrPeriods: number; + nrThrottled: number; + timestampMs: number; +} + +interface ProcessCpuSnapshot { + cpuSecondsTotal: number; + timestampMs: number; +} + +/** + * Reads container-aware CPU metrics with ordered fallbacks: + * 1) cgroups v2 cpu.stat (container-level) + * 2) Prometheus process_cpu_seconds_total scrape (optional) + * 3) host loadavg (last resort) + */ +@Injectable() +export class ContainerCpuMetricsService { + private readonly logger = new Logger(ContainerCpuMetricsService.name); + private readonly cpuStatPath = '/sys/fs/cgroup/cpu.stat'; + private previousCpuStatSnapshot?: CpuStatSnapshot; + private previousProcessCpuSnapshot?: ProcessCpuSnapshot; + + async getCpuLoadRatio(): Promise { + const cgroupRatio = this.getCgroupCpuLoadRatio(); + if (cgroupRatio !== null) { + return cgroupRatio; + } + + const prometheusRatio = await this.getPrometheusCpuLoadRatio(); + if (prometheusRatio !== null) { + return prometheusRatio; + } + + return this.getLoadAvgRatio(); + } + + private getCgroupCpuLoadRatio(): number | null { + try { + const cpuStat = readFileSync(this.cpuStatPath, 'utf8'); + const usageUsec = this.readMetric(cpuStat, 'usage_usec'); + const nrPeriods = this.readMetric(cpuStat, 'nr_periods'); + const nrThrottled = this.readMetric(cpuStat, 'nr_throttled'); + + if (usageUsec === null || nrPeriods === null || nrThrottled === null) { + return null; + } + + const current: CpuStatSnapshot = { + usageUsec, + nrPeriods, + nrThrottled, + timestampMs: Date.now(), + }; + + const throttleRatio = nrPeriods > 0 ? this.clamp(nrThrottled / nrPeriods) : 0; + const cpuCount = Math.max(1, os.cpus().length); + + if (!this.previousCpuStatSnapshot) { + this.previousCpuStatSnapshot = current; + return throttleRatio; + } + + const elapsedUsec = (current.timestampMs - this.previousCpuStatSnapshot.timestampMs) * 1000; + const deltaUsageUsec = current.usageUsec - this.previousCpuStatSnapshot.usageUsec; + this.previousCpuStatSnapshot = current; + + if (elapsedUsec <= 0 || deltaUsageUsec < 0) { + return throttleRatio; + } + + const usageRatio = this.clamp(deltaUsageUsec / elapsedUsec / cpuCount); + return Math.max(usageRatio, throttleRatio); + } catch { + return null; + } + } + + private async getPrometheusCpuLoadRatio(): Promise { + const url = process.env.PROMETHEUS_METRICS_URL; + if (!url) { + return null; + } + + try { + const response = await fetch(url); + if (!response.ok) { + this.logger.warn(`Prometheus metrics scrape failed: status=${response.status}`); + return null; + } + + const metricsText = await response.text(); + const cpuSecondsTotal = this.readPrometheusProcessCpuSeconds(metricsText); + if (cpuSecondsTotal === null) { + return null; + } + + const current: ProcessCpuSnapshot = { cpuSecondsTotal, timestampMs: Date.now() }; + const cpuCount = Math.max(1, os.cpus().length); + + if (!this.previousProcessCpuSnapshot) { + this.previousProcessCpuSnapshot = current; + return null; + } + + const elapsedSeconds = (current.timestampMs - this.previousProcessCpuSnapshot.timestampMs) / 1000; + const deltaCpuSeconds = current.cpuSecondsTotal - this.previousProcessCpuSnapshot.cpuSecondsTotal; + this.previousProcessCpuSnapshot = current; + + if (elapsedSeconds <= 0 || deltaCpuSeconds < 0) { + return null; + } + + return this.clamp(deltaCpuSeconds / elapsedSeconds / cpuCount); + } catch { + return null; + } + } + + private getLoadAvgRatio(): number { + const load = os.loadavg()[0]; + const cpuCount = Math.max(1, os.cpus().length); + return this.clamp(load / cpuCount); + } + + private readMetric(content: string, key: string): number | null { + const match = content.match(new RegExp(`^${key}\\s+(\\d+)`, 'm')); + if (!match) return null; + + const parsed = Number(match[1]); + return Number.isFinite(parsed) ? parsed : null; + } + + private readPrometheusProcessCpuSeconds(metricsText: string): number | null { + const match = metricsText.match( + /^process_cpu_seconds_total(?:\{[^}]*\})?\s+([0-9]+(?:\.[0-9]+)?)$/m, + ); + if (!match) return null; + + const parsed = Number(match[1]); + return Number.isFinite(parsed) ? parsed : null; + } + + private clamp(value: number): number { + if (!Number.isFinite(value)) return 0; + if (value < 0) return 0; + if (value > 1) return 1; + return value; + } +} diff --git a/src/rate-limiting/services/quota-tracking.service.ts b/src/rate-limiting/services/quota-tracking.service.ts index 5ed76bca..13a0c894 100644 --- a/src/rate-limiting/services/quota-tracking.service.ts +++ b/src/rate-limiting/services/quota-tracking.service.ts @@ -31,10 +31,15 @@ export class QuotaTrackingService { */ async checkAndIncrement(userId: string, tier: UserTier): Promise { const baseLimits = await this.definitionService.resolveForUser(userId, tier); + const [requestsPerMinute, requestsPerHour, requestsPerDay] = await Promise.all([ + this.adaptive.adjustLimit(baseLimits.requestsPerMinute), + this.adaptive.adjustLimit(baseLimits.requestsPerHour), + this.adaptive.adjustLimit(baseLimits.requestsPerDay), + ]); const limits = { - requestsPerMinute: this.adaptive.adjustLimit(baseLimits.requestsPerMinute), - requestsPerHour: this.adaptive.adjustLimit(baseLimits.requestsPerHour), - requestsPerDay: this.adaptive.adjustLimit(baseLimits.requestsPerDay), + requestsPerMinute, + requestsPerHour, + requestsPerDay, }; const now = new Date(); @@ -86,10 +91,15 @@ export class QuotaTrackingService { /** Get quota status without incrementing (for status endpoint). */ async getStatus(userId: string, tier: UserTier): Promise { const baseLimits = await this.definitionService.resolveForUser(userId, tier); + const [requestsPerMinute, requestsPerHour, requestsPerDay] = await Promise.all([ + this.adaptive.adjustLimit(baseLimits.requestsPerMinute), + this.adaptive.adjustLimit(baseLimits.requestsPerHour), + this.adaptive.adjustLimit(baseLimits.requestsPerDay), + ]); const limits = { - requestsPerMinute: this.adaptive.adjustLimit(baseLimits.requestsPerMinute), - requestsPerHour: this.adaptive.adjustLimit(baseLimits.requestsPerHour), - requestsPerDay: this.adaptive.adjustLimit(baseLimits.requestsPerDay), + requestsPerMinute, + requestsPerHour, + requestsPerDay, }; const now = new Date(); diff --git a/src/rate-limiting/services/quota.service.ts b/src/rate-limiting/services/quota.service.ts index daa6a9e2..84950a36 100644 --- a/src/rate-limiting/services/quota.service.ts +++ b/src/rate-limiting/services/quota.service.ts @@ -21,10 +21,16 @@ export class QuotaManagementService { /** Resolve and return the effective quota limits for a user. */ async getQuotaForUser(userId: string, tier: UserTier) { const base = await this.definitions.resolveForUser(userId, tier); + const [requestsPerMinute, requestsPerHour, requestsPerDay] = await Promise.all([ + this.adaptive.adjustLimit(base.requestsPerMinute), + this.adaptive.adjustLimit(base.requestsPerHour), + this.adaptive.adjustLimit(base.requestsPerDay), + ]); + return { - requestsPerMinute: this.adaptive.adjustLimit(base.requestsPerMinute), - requestsPerHour: this.adaptive.adjustLimit(base.requestsPerHour), - requestsPerDay: this.adaptive.adjustLimit(base.requestsPerDay), + requestsPerMinute, + requestsPerHour, + requestsPerDay, }; }