Skip to content

Commit 235e5b2

Browse files
alecthomasclaude
andcommitted
feat(scheduler): add admission cost, dedup, and RunSync cancellation
Add per-job admission cost floor to prevent volume-based DoS by charging a minimum fairness cost regardless of actual job duration. Deduplicate jobs by (type, id): Submit silently drops duplicates, RunSync coalesces callers onto the existing job. Sync and async jobs can be mixed on the same key. When all RunSync waiters cancel and no Submit owns the job, the job's context is cancelled. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 95e934d commit 235e5b2

5 files changed

Lines changed: 391 additions & 75 deletions

File tree

internal/scheduler/README.md

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ All work — foreground and background — goes through a single scheduler. The
2020

2121
**Cost**: A `time.Duration` representing the relative system impact of a job. The scheduler automatically learns the cost of each `(job_type, job_id)` pair using an exponential moving average of observed execution time. On first encounter, a small default (1 second) is used. Callers never specify cost explicitly.
2222

23-
**Accumulated cost**: A running total of cost consumed per fairness key. Every time a job is admitted, its estimated cost is added to its fairness key's accumulated cost. The scheduler always picks the job whose fairness key has the lowest accumulated cost — whoever has consumed the least goes next.
23+
**Admission cost**: A configurable minimum fairness cost charged per job admission (`Config.AdmissionCost`, default 5 seconds). At admission time, the fairness charge is `max(estimated_cost, admission_cost)`. This prevents a client from monopolising the scheduler by submitting many cheap jobs — even a job that completes in 100ms still costs 5 seconds of fairness budget, so mass-submission is expensive in fairness terms without affecting actual execution time.
24+
25+
**Accumulated cost**: A running total of cost consumed per fairness key. Every time a job is admitted, the admission cost (the max of estimated and configured minimum) is added to its fairness key's accumulated cost. The scheduler always picks the job whose fairness key has the lowest accumulated cost — whoever has consumed the least goes next.
2426

2527
**Fairness key**: An opaque string on the job, populated by the caller. For foreground jobs, this is typically the client IP or identity. For background jobs, this is empty. The scheduler doesn't know what it represents — it just uses it for ordering.
2628

@@ -159,6 +161,17 @@ scheduler.Submit(
159161

160162
Both enter the same pending queue and the same admission logic. `RunSync` blocks on a completion signal before returning to the caller.
161163

164+
### Deduplication
165+
166+
Jobs are deduplicated by `(job_type, job_id)`. If a job with the same key is already pending or running:
167+
168+
- **`Submit`**: the call is silently dropped. The existing job's `fn` will be used.
169+
- **`RunSync`**: the caller coalesces onto the existing job and receives its result when it completes. Multiple callers can wait on the same job simultaneously.
170+
171+
Sync and async jobs can be mixed for the same `(job_type, job_id)`. A `RunSync` onto an active `Submit` job attaches a waiter. A `Submit` onto an active `RunSync` job marks it as submitted so it survives waiter cancellation.
172+
173+
This means if ten clients request the same clone concurrently, only one clone runs — all ten callers receive the result. If a coalesced `RunSync` caller's context is cancelled, that caller is detached but the job continues for the remaining waiters. When **all** waiters cancel and no `Submit` created the job, the job itself is cancelled via its context — both pending and running jobs are stopped.
174+
162175
## Dispatch Algorithm
163176

164177
The entire scheduling algorithm:
@@ -174,7 +187,7 @@ for each job in sorted order:
174187
if type_running_count >= type_slots → skip
175188
if any running job has same job_id AND same non-empty conflict_group → skip
176189
admit job
177-
estimated_cost = cost_estimates[(job_type, job_id)] or 1s
190+
estimated_cost = max(cost_estimates[(job_type, job_id)] or 1s, config.admission_cost)
178191
accumulated_cost[fairness_key] += estimated_cost
179192
```
180193

@@ -194,6 +207,7 @@ Key properties of this algorithm:
194207
- **Per-type limits**: within a tier, individual job types can be capped to a fraction of the tier's allocation, preventing expensive operations from monopolising the tier.
195208
- **Fairness**: within a priority level, jobs from the fairness key with the lowest accumulated cost go first. A client that has consumed a lot of capacity yields to one that has consumed little.
196209
- **Cost-awareness**: expensive jobs advance accumulated cost faster, so they naturally yield to cheaper work from other clients. A `linux.git` clone that takes 60 seconds advances the client's accumulated cost by ~60s, while a `git.git` clone that takes 5 seconds advances it by ~5s.
210+
- **Volume-awareness**: the admission cost floor ensures that submitting many cheap jobs is still expensive in fairness terms. A client submitting 1000 trivial jobs accumulates at least `1000 * admission_cost`, causing it to yield to clients that have submitted fewer jobs.
197211
- **Adaptive**: the scheduler automatically learns the cost of each `(job_type, job_id)` pair. No manual cost tuning required. After one execution, estimates are already meaningful.
198212
- **Conflict safety**: conflicting jobs on the same resource stay in the pending queue, not consuming concurrency slots while they wait.
199213
- **No head-of-line blocking**: if the next job by ordering is blocked (conflict or concurrency limit), the scheduler skips it and admits the next admissible job.

internal/scheduler/scheduler.go

Lines changed: 94 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,15 @@ type Config struct {
7575
CostTTL time.Duration `hcl:"cost-ttl,optional" help:"TTL for cost estimate entries." default:"1h"`
7676
FairnessTTL time.Duration `hcl:"fairness-ttl,optional" help:"TTL for accumulated cost entries." default:"10m"`
7777
CleanupInterval time.Duration `hcl:"cleanup-interval,optional" help:"How often to run TTL cleanup." default:"1m"`
78+
// AdmissionCost is the minimum fairness cost charged per job admission, regardless of the
79+
// job's estimated duration. This prevents a client from monopolising the scheduler by
80+
// submitting many cheap jobs.
81+
AdmissionCost time.Duration `hcl:"admission-cost,optional" help:"Minimum fairness cost per job admission." default:"5s"`
82+
}
83+
84+
type dedupKey struct {
85+
jobType JobType
86+
jobID string
7887
}
7988

8089
type job struct {
@@ -83,7 +92,10 @@ type job struct {
8392
fairnessKey string
8493
fn func(ctx context.Context) error
8594
arrivalTime time.Time
86-
done chan error // non-nil for RunSync
95+
submitted bool // true if created or referenced by Submit
96+
waiters []chan error // RunSync callers waiting for the result
97+
ctx context.Context // context passed to fn; cancellable for RunSync jobs
98+
cancel context.CancelFunc // non-nil for RunSync jobs
8799
}
88100

89101
func (j *job) String() string { return j.jobType.String() + ":" + j.jobID }
@@ -115,6 +127,7 @@ type Scheduler struct {
115127
types map[JobType]JobTypeConfig
116128
pending []*job
117129
running []*runningJob
130+
active map[dedupKey]*job // all pending + running jobs for dedup
118131
fairness map[string]*fairnessEntry
119132
costs map[costKey]*costEntry
120133
config Config
@@ -142,6 +155,7 @@ func New(ctx context.Context, config Config, ns *metadatadb.Namespace) (*Schedul
142155
s := &Scheduler{
143156
priorities: make(map[Priority]bool),
144157
types: make(map[JobType]JobTypeConfig),
158+
active: make(map[dedupKey]*job),
145159
fairness: make(map[string]*fairnessEntry),
146160
costs: make(map[costKey]*costEntry),
147161
lastRunsLocal: make(map[string]time.Time),
@@ -199,55 +213,73 @@ func (s *Scheduler) validateType(jt JobType) {
199213
}
200214

201215
// Submit queues a background job for async execution. Returns immediately.
216+
// If a job with the same (type, id) is already pending or running, the
217+
// submission is silently deduplicated.
202218
func (s *Scheduler) Submit(jobType JobType, jobID string, fn func(ctx context.Context) error) {
203219
s.mu.Lock()
204220
s.validateType(jobType)
205-
s.pending = append(s.pending, &job{
221+
key := dedupKey{jobType, jobID}
222+
if existing, ok := s.active[key]; ok {
223+
existing.submitted = true
224+
s.mu.Unlock()
225+
return
226+
}
227+
j := &job{
206228
jobType: jobType,
207229
jobID: jobID,
208230
fn: fn,
209231
arrivalTime: s.now(),
210-
})
232+
submitted: true,
233+
}
234+
s.active[key] = j
235+
s.pending = append(s.pending, j)
211236
s.mu.Unlock()
212237
s.signal()
213238
}
214239

215240
// RunSync submits a foreground job and blocks until it completes or ctx is
216-
// cancelled. The fn receives a context that is cancelled when either the
217-
// caller's ctx or the scheduler's context is done.
241+
// cancelled. If a job with the same (type, id) is already pending or running,
242+
// the caller coalesces onto the existing job and receives its result. When all
243+
// coalesced callers cancel, the job itself is cancelled.
218244
func (s *Scheduler) RunSync(ctx context.Context, jobType JobType, jobID, fairnessKey string, fn func(ctx context.Context) error) error {
219-
jobCtx, jobCancel := context.WithCancel(ctx)
220-
stop := context.AfterFunc(s.ctx, jobCancel)
245+
done := make(chan error, 1)
221246

222247
s.mu.Lock()
223248
s.validateType(jobType)
224-
s.mu.Unlock()
225-
226-
done := make(chan error, 1)
249+
key := dedupKey{jobType, jobID}
250+
if existing, ok := s.active[key]; ok {
251+
existing.waiters = append(existing.waiters, done)
252+
s.mu.Unlock()
253+
return s.awaitDone(ctx, existing, done)
254+
}
255+
jobCtx, jobCancel := context.WithCancel(s.ctx)
227256
j := &job{
228257
jobType: jobType,
229258
jobID: jobID,
230259
fairnessKey: fairnessKey,
231-
fn: func(_ context.Context) error { return fn(jobCtx) },
260+
fn: fn,
232261
arrivalTime: s.now(),
233-
done: done,
262+
waiters: []chan error{done},
263+
ctx: jobCtx,
264+
cancel: jobCancel,
234265
}
235-
s.mu.Lock()
266+
s.active[key] = j
236267
s.pending = append(s.pending, j)
237268
s.mu.Unlock()
238269
s.signal()
239270

271+
return s.awaitDone(ctx, j, done)
272+
}
273+
274+
func (s *Scheduler) awaitDone(ctx context.Context, j *job, done chan error) error {
240275
select {
241276
case err := <-done:
242-
stop()
243-
jobCancel()
244277
return err
245278
case <-ctx.Done():
246279
s.mu.Lock()
247-
s.removePendingLocked(j)
280+
s.removeWaiterLocked(j, done)
281+
s.maybeRemoveJobLocked(j)
248282
s.mu.Unlock()
249-
stop()
250-
jobCancel()
251283
return errors.WithStack(ctx.Err())
252284
}
253285
}
@@ -383,7 +415,11 @@ func (s *Scheduler) tierSlotsLocked(weight float64) int {
383415
func (s *Scheduler) executeJob(j *job) {
384416
start := s.now()
385417
s.logger.InfoContext(s.ctx, "Starting job", "job", j)
386-
err := j.fn(s.ctx)
418+
fnCtx := s.ctx
419+
if j.ctx != nil {
420+
fnCtx = j.ctx
421+
}
422+
err := j.fn(fnCtx)
387423
elapsed := s.now().Sub(start)
388424

389425
if err != nil {
@@ -395,11 +431,14 @@ func (s *Scheduler) executeJob(j *job) {
395431
s.mu.Lock()
396432
s.updateCostEstimateLocked(j.jobType, j.jobID, elapsed)
397433
s.removeFromRunningLocked(j)
434+
delete(s.active, dedupKey{j.jobType, j.jobID})
435+
waiters := j.waiters
436+
j.waiters = nil
398437
s.recordMetricsLocked()
399438
s.mu.Unlock()
400439

401-
if j.done != nil {
402-
j.done <- err
440+
for _, w := range waiters {
441+
w <- err
403442
}
404443
s.signal()
405444
}
@@ -450,10 +489,11 @@ func (s *Scheduler) hasConflictLocked(j *job) bool {
450489
const defaultCost = time.Second
451490

452491
func (s *Scheduler) estimatedCostLocked(j *job) time.Duration {
492+
est := defaultCost
453493
if entry, ok := s.costs[costKey{j.jobType, j.jobID}]; ok {
454-
return entry.estimate
494+
est = entry.estimate
455495
}
456-
return defaultCost
496+
return max(est, s.config.AdmissionCost)
457497
}
458498

459499
func (s *Scheduler) updateCostEstimateLocked(jt JobType, jobID string, elapsed time.Duration) {
@@ -530,6 +570,37 @@ func (s *Scheduler) removePendingLocked(j *job) {
530570
s.pending = slices.DeleteFunc(s.pending, func(pj *job) bool { return pj == j })
531571
}
532572

573+
func (s *Scheduler) removeWaiterLocked(j *job, done chan error) {
574+
j.waiters = slices.DeleteFunc(j.waiters, func(w chan error) bool { return w == done })
575+
}
576+
577+
func (s *Scheduler) isRunningLocked(j *job) bool {
578+
for _, rj := range s.running {
579+
if rj.job == j {
580+
return true
581+
}
582+
}
583+
return false
584+
}
585+
586+
// maybeRemoveJobLocked is called when a RunSync waiter is removed. If no
587+
// waiters remain it cancels the job's context. Pending jobs are also removed
588+
// from the queue and active map; running jobs are left for executeJob to
589+
// clean up after the fn returns.
590+
func (s *Scheduler) maybeRemoveJobLocked(j *job) {
591+
if len(j.waiters) > 0 || j.submitted {
592+
return
593+
}
594+
if j.cancel != nil {
595+
j.cancel()
596+
}
597+
if s.isRunningLocked(j) {
598+
return
599+
}
600+
s.removePendingLocked(j)
601+
delete(s.active, dedupKey{j.jobType, j.jobID})
602+
}
603+
533604
// --- TTL cleanup ---
534605

535606
func (s *Scheduler) cleanupLoop() {

0 commit comments

Comments
 (0)