Skip to content

Commit e551e65

Browse files
alecthomasclaude
andcommitted
feat: add weighted fair queuing scheduler
Introduces a scheduler with four layered admission constraints: - Total concurrency: hard global cap on running jobs - Priority tiers: weighted share of total concurrency per tier (Priority struct with Level + Weight) - Per-type limits: fraction of tier slots per job type - Conflict groups: mutual exclusion by group + job ID Within a tier, jobs are ordered by accumulated cost per fairness key (lowest first), with cost estimated via EMA of wall time. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 6fbc1c6 commit e551e65

6 files changed

Lines changed: 1537 additions & 0 deletions

File tree

internal/scheduler/README.md

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
# Scheduler Redesign: Weighted Fair Queuing with Conflict Exclusion
2+
3+
## Problem Statement
4+
5+
The current internal async task queue has a max concurrency limit and sub-queues bounded to 1 for synchronisation. This is insufficient for real-world usage:
6+
7+
1. **Client DoS**: A single client initiated 1000 parallel clones, starving all other clients.
8+
2. **Uniform cost assumption**: All jobs are treated equally, but `linux.git` clone is far more intensive than `git.git` clone.
9+
3. **No foreground/background interaction**: Synchronous (foreground) work contributes to load, but the scheduler can't account for it because only async (background) jobs go through it.
10+
11+
## Design Overview
12+
13+
![Scheduler flow diagram](scheduler.svg)
14+
15+
All work — foreground and background — goes through a single scheduler. The scheduler uses **cost-based fair queuing** to ensure no single client monopolises the system, with **conflict key exclusion** to prevent unsafe concurrent operations on the same resource.
16+
17+
### Core Concepts
18+
19+
**Job**: The unit of work, identified by `(job_type, job_id)`. Optionally carries a `fairness_key` for foreground work.
20+
21+
**Cost**: A `time.Duration` representing the relative system impact of a job. The scheduler automatically learns the cost of each `(job_type, job_id)` pair using an exponential moving average of observed execution time. On first encounter, a small default (1 second) is used. Callers never specify cost explicitly.
22+
23+
**Accumulated cost**: A running total of cost consumed per fairness key. Every time a job is admitted, its estimated cost is added to its fairness key's accumulated cost. The scheduler always picks the job whose fairness key has the lowest accumulated cost — whoever has consumed the least goes next.
24+
25+
**Fairness key**: An opaque string on the job, populated by the caller. For foreground jobs, this is typically the client IP or identity. For background jobs, this is empty. The scheduler doesn't know what it represents — it just uses it for ordering.
26+
27+
**Conflict group**: A named group on the job type config. Two jobs conflict if they share a `job_id` and belong to the same non-empty conflict group. For example, `sync-clone`, `repack`, and `pull` all belong to the `"git"` conflict group — at most one of these can run on a given repo at a time. `snapshot` has no conflict group, so it runs concurrently with anything. This avoids the error-prone per-type conflict lists and ensures symmetry automatically.
28+
29+
**Priority**: A struct with `Level` (dispatch ordering) and `Weight` (share of total concurrency). Priority is a property of a scheduling tier, not an individual job type — multiple job types share the same `Priority` value.
30+
31+
**Total concurrency**: A global cap on the maximum number of concurrently running jobs across all tiers, configured via `Config.TotalConcurrency`.
32+
33+
**Weight**: Each priority tier has a `Weight`. The scheduler divides `TotalConcurrency` among tiers proportionally: a tier's slot allocation is `Weight / sum(all tiers' weights) * TotalConcurrency`. For example, with `TotalConcurrency=50`, a foreground tier with weight 4 and a background tier with weight 1 get 40 and 10 slots respectively.
34+
35+
**Max concurrency (per type)**: A fraction (0–1) of the tier's computed slot allocation that a single job type may consume. For example, `MaxConcurrency: 0.3` on a type in a tier with 10 slots means at most 3 concurrent jobs of that type. The floor is 1, so every registered type can always run at least one job.
36+
37+
### Job Model
38+
39+
```go
40+
// JobType is a named string type for type safety. Constants are defined
41+
// by the application, not the scheduler package, keeping the scheduler
42+
// agnostic to domain concepts like git.
43+
type JobType string
44+
```
45+
46+
### Priority and Job Type Configuration
47+
48+
```go
49+
// Priority defines a scheduling tier with dispatch ordering and a share of
50+
// total concurrency. Two Priority values with the same Level are the same
51+
// tier and must have the same Weight.
52+
type Priority struct {
53+
Level int // higher = dispatched first
54+
Weight float64 // weight for dividing TotalConcurrency among tiers
55+
}
56+
57+
type ConflictGroup string
58+
59+
type JobTypeConfig struct {
60+
MaxConcurrency float64 // fraction (0-1) of tier's slots this type may use
61+
ConflictGroup ConflictGroup // jobs in same group conflict on same job_id
62+
Priority Priority // scheduling tier this job type belongs to
63+
}
64+
```
65+
66+
Example registrations:
67+
68+
```go
69+
// Application-level constants, not in the scheduler package
70+
var (
71+
PriorityForeground = scheduler.Priority{Level: 10, Weight: 4}
72+
PriorityBackground = scheduler.Priority{Level: 5, Weight: 1}
73+
)
74+
75+
const (
76+
JobTypeSyncClone scheduler.JobType = "sync-clone"
77+
JobTypeRepack scheduler.JobType = "repack"
78+
JobTypePull scheduler.JobType = "pull"
79+
JobTypeSnapshot scheduler.JobType = "snapshot"
80+
81+
ConflictGroupGit scheduler.ConflictGroup = "git"
82+
)
83+
84+
scheduler.RegisterPriority(PriorityForeground)
85+
scheduler.RegisterPriority(PriorityBackground)
86+
87+
scheduler.RegisterType(JobTypeSyncClone, scheduler.JobTypeConfig{
88+
MaxConcurrency: 1.0,
89+
ConflictGroup: ConflictGroupGit,
90+
Priority: PriorityForeground,
91+
})
92+
93+
scheduler.RegisterType(JobTypeRepack, scheduler.JobTypeConfig{
94+
MaxConcurrency: 0.3,
95+
ConflictGroup: ConflictGroupGit,
96+
Priority: PriorityBackground,
97+
})
98+
99+
scheduler.RegisterType(JobTypePull, scheduler.JobTypeConfig{
100+
MaxConcurrency: 0.3,
101+
ConflictGroup: ConflictGroupGit,
102+
Priority: PriorityBackground,
103+
})
104+
105+
// No ConflictGroup — never conflicts with anything
106+
scheduler.RegisterType(JobTypeSnapshot, scheduler.JobTypeConfig{
107+
MaxConcurrency: 0.5,
108+
Priority: PriorityBackground,
109+
})
110+
```
111+
112+
With `TotalConcurrency=50`: foreground gets `4/(4+1) * 50 = 40` slots, background gets `1/(4+1) * 50 = 10` slots. Repack can use at most `0.3 * 10 = 3` of those background slots.
113+
114+
### Calling Patterns
115+
116+
Foreground (synchronous) — the caller blocks until the job completes:
117+
118+
```go
119+
func (s *Scheduler) RunSync(
120+
ctx context.Context,
121+
jobType JobType,
122+
jobID string,
123+
fairnessKey string,
124+
fn func(ctx context.Context) error,
125+
) error
126+
```
127+
128+
```go
129+
err := scheduler.RunSync(
130+
ctx,
131+
JobTypeSyncClone,
132+
"github.com/torvalds/linux",
133+
request.RemoteAddr,
134+
func(ctx context.Context) error {
135+
return cloneRepo(ctx, "github.com/torvalds/linux")
136+
},
137+
)
138+
```
139+
140+
Background (async) — returns immediately, job runs when admitted:
141+
142+
```go
143+
func (s *Scheduler) Submit(
144+
jobType JobType,
145+
jobID string,
146+
fn func(ctx context.Context) error,
147+
)
148+
```
149+
150+
```go
151+
scheduler.Submit(
152+
JobTypeRepack,
153+
"github.com/torvalds/linux",
154+
func(ctx context.Context) error {
155+
return repackRepo(ctx, "github.com/torvalds/linux")
156+
},
157+
)
158+
```
159+
160+
Both enter the same pending queue and the same admission logic. `RunSync` blocks on a completion signal before returning to the caller.
161+
162+
## Dispatch Algorithm
163+
164+
The entire scheduling algorithm:
165+
166+
```
167+
sort pending jobs by (-priority.level, accumulated_cost[fairness_key], arrival_time)
168+
169+
for each job in sorted order:
170+
if total_running >= config.total_concurrency → skip
171+
tier_slots = tier.weight / sum(all tier weights) * total_concurrency
172+
if count(running where priority.level == job.priority.level) >= tier_slots → skip
173+
type_slots = max(1, int(type.max_concurrency * tier_slots))
174+
if type_running_count >= type_slots → skip
175+
if any running job has same job_id AND same non-empty conflict_group → skip
176+
admit job
177+
estimated_cost = cost_estimates[(job_type, job_id)] or 1s
178+
accumulated_cost[fairness_key] += estimated_cost
179+
```
180+
181+
When a job completes:
182+
183+
```
184+
elapsed = wall time since job started
185+
cost_estimates[(job_type, job_id)] = α * elapsed + (1-α) * cost_estimates[(job_type, job_id)]
186+
re-evaluate pending queue for newly admissible jobs
187+
```
188+
189+
Key properties of this algorithm:
190+
191+
- **Total concurrency**: a hard global cap prevents the system from being overloaded regardless of tier configuration.
192+
- **Priority**: foreground always dispatched before background. Background only runs in capacity not used by foreground.
193+
- **Proportional tier allocation**: each priority tier gets a share of total concurrency based on its `Weight` relative to all other tiers' weights. This makes configuration scale-independent — changing `TotalConcurrency` adjusts all tiers proportionally.
194+
- **Per-type limits**: within a tier, individual job types can be capped to a fraction of the tier's allocation, preventing expensive operations from monopolising the tier.
195+
- **Fairness**: within a priority level, jobs from the fairness key with the lowest accumulated cost go first. A client that has consumed a lot of capacity yields to one that has consumed little.
196+
- **Cost-awareness**: expensive jobs advance accumulated cost faster, so they naturally yield to cheaper work from other clients. A `linux.git` clone that takes 60 seconds advances the client's accumulated cost by ~60s, while a `git.git` clone that takes 5 seconds advances it by ~5s.
197+
- **Adaptive**: the scheduler automatically learns the cost of each `(job_type, job_id)` pair. No manual cost tuning required. After one execution, estimates are already meaningful.
198+
- **Conflict safety**: conflicting jobs on the same resource stay in the pending queue, not consuming concurrency slots while they wait.
199+
- **No head-of-line blocking**: if the next job by ordering is blocked (conflict or concurrency limit), the scheduler skips it and admits the next admissible job.
200+
201+
## Cost Estimation
202+
203+
The scheduler maintains an exponential moving average of observed wall time per `(job_type, job_id)`:
204+
205+
```
206+
estimatedCost = α * observedWallTime + (1-α) * estimatedCost
207+
```
208+
209+
`α` is the smoothing factor (0–1) controlling how quickly estimates adapt. `α = 0.3` is a reasonable default — it converges to the true value within a handful of runs, while remaining stable against outliers (e.g., a single slow clone due to network congestion won't drastically inflate the estimate). Should be a configurable constant.
210+
211+
Wall time directly measures the resource being rationed — how long a job holds a concurrency slot. On first encounter of a `(job_type, job_id)` pair, a small default (1 second) is used. After one execution, the estimate is based on real data.
212+
213+
The estimates map needs TTL-based cleanup, same as the accumulated cost map. Estimates could optionally be persisted across restarts to avoid cold-start inaccuracy, using the existing persistence layer.
214+
215+
## Accumulated Cost Lifecycle
216+
217+
The accumulated cost map needs periodic cleanup since fairness keys (client IPs) are ephemeral — thousands of agentic workstations may spin up and down. Options:
218+
219+
- **TTL-based eviction**: remove entries not seen for N minutes.
220+
- **Periodic reset**: zero all counters every N minutes.
221+
- **Advance idle keys**: when a key is seen again after being idle, advance it to the current global minimum (prevents penalising returning clients, prevents exploiting fresh counters).
222+
223+
Start with TTL-based eviction and refine based on production behaviour.
224+
225+
## Go Implementation Notes
226+
227+
### Building Blocks
228+
229+
- `golang.org/x/sync/semaphore`*not* needed. The weighted semaphore approach was considered and rejected in favour of simple concurrency counting, which avoids starvation issues with high-cost jobs.
230+
- `container/heap` — useful for the priority queue ordering.
231+
- `sync.Cond` or channel — for waking the dispatch loop when a job completes.
232+
233+
### Synchronisation Concern
234+
235+
The previous implementation conflated synchronisation (mutex per resource) with scheduling. In this design, synchronisation is handled by conflict groups within the scheduler. There is no external mutex — the scheduler itself ensures jobs in the same conflict group don't run concurrently on the same resource by keeping them in the pending queue.
236+
237+
### Persistence
238+
239+
The existing persistence layer for scheduled jobs (recording last execution time to avoid thundering herd on restart) remains unchanged. It's orthogonal to the scheduling algorithm.
240+
241+
### Prior Art
242+
243+
The Kubernetes API Priority and Fairness system (`k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing`) solves a very similar problem for the kube-apiserver. It uses priority levels, flow distinguishers (fairness keys), shuffle sharding, and work estimation in "seats" (cost). The KEP is worth reading for context:
244+
245+
https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/1040-priority-and-fairness/README.md
246+
247+
The k8s implementation is too coupled to the apiserver to use as a library, but the design concepts directly informed this approach. Our design is simpler: no shuffle sharding (explicit fairness keys instead), no dynamic reconfiguration, and we add conflict key exclusion which k8s doesn't have.

internal/scheduler/metrics.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package scheduler
2+
3+
import (
4+
"go.opentelemetry.io/otel"
5+
"go.opentelemetry.io/otel/metric"
6+
7+
"github.com/block/cachew/internal/metrics"
8+
)
9+
10+
type schedulerMetrics struct {
11+
pendingJobs metric.Int64Gauge
12+
runningJobs metric.Int64Gauge
13+
jobsTotal metric.Int64Counter
14+
jobDuration metric.Float64Histogram
15+
}
16+
17+
func newSchedulerMetrics() *schedulerMetrics {
18+
meter := otel.Meter("cachew.scheduler")
19+
return &schedulerMetrics{
20+
pendingJobs: metrics.NewMetric[metric.Int64Gauge](meter, "cachew.scheduler.pending_jobs", "{jobs}", "Number of jobs waiting in the pending queue"),
21+
runningJobs: metrics.NewMetric[metric.Int64Gauge](meter, "cachew.scheduler.running_jobs", "{jobs}", "Number of jobs currently executing"),
22+
jobsTotal: metrics.NewMetric[metric.Int64Counter](meter, "cachew.scheduler.jobs_total", "{jobs}", "Total number of completed scheduler jobs"),
23+
jobDuration: metrics.NewMetric[metric.Float64Histogram](meter, "cachew.scheduler.job_duration_seconds", "s", "Duration of scheduler jobs in seconds"),
24+
}
25+
}

0 commit comments

Comments
 (0)