Skip to content

Commit 72a39f1

Browse files
author
Alexandru Adam
committed
Fix compactor concurrency limit not enforced across compaction levels
1 parent 45b52d3 commit 72a39f1

6 files changed

Lines changed: 332 additions & 4 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Changelog
22

33
## master / unreleased
4+
* [BUGFIX] Compactor: Fix compaction concurrency limit not being respected across compaction levels. When `compaction-concurrency` was set to 1, multiple compactions (e.g., 12h and 24h) could still run in the same pass due to the BucketCompactor loop calling `Groups()` repeatedly. The grouper now tracks cumulative groups returned and enforces the limit across calls. #7298
45
* [CHANGE] StoreGateway/Alertmanager: Add default 5s connection timeout on client. #6603
56
* [CHANGE] Ingester: Remove EnableNativeHistograms config flag and instead gate keep through new per-tenant limit at ingestion. #6718
67
* [CHANGE] Validate a tenantID when to use a single tenant resolver. #6727

pkg/compactor/compactor_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/oklog/ulid/v2"
2323
"github.com/pkg/errors"
2424
"github.com/prometheus/client_golang/prometheus"
25+
"github.com/prometheus/client_golang/prometheus/promauto"
2526
prom_testutil "github.com/prometheus/client_golang/prometheus/testutil"
2627
"github.com/prometheus/prometheus/model/labels"
2728
"github.com/prometheus/prometheus/tsdb"
@@ -2449,3 +2450,127 @@ func TestCompactor_UserIndexUpdateLoop(t *testing.T) {
24492450
}
24502451
}
24512452
}
2453+
2454+
func TestCompactor_ShouldRespectConcurrencyLimitAcrossCompactionLevels(t *testing.T) {
2455+
block0hto2hUlid := ulid.MustNew(301, nil)
2456+
block2hto4hUlid := ulid.MustNew(302, nil)
2457+
block4hto6hUlid := ulid.MustNew(303, nil)
2458+
block6hto8hUlid := ulid.MustNew(304, nil)
2459+
block8hto10hUlid := ulid.MustNew(305, nil)
2460+
block10hto12hUlid := ulid.MustNew(306, nil)
2461+
block24hto36hUlid := ulid.MustNew(307, nil)
2462+
block36hto48hUlid := ulid.MustNew(308, nil)
2463+
2464+
h := time.Hour.Milliseconds()
2465+
2466+
blocks := map[ulid.ULID]*metadata.Meta{
2467+
block0hto2hUlid: {
2468+
BlockMeta: tsdb.BlockMeta{ULID: block0hto2hUlid, MinTime: 0, MaxTime: 2 * h},
2469+
Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}},
2470+
},
2471+
block2hto4hUlid: {
2472+
BlockMeta: tsdb.BlockMeta{ULID: block2hto4hUlid, MinTime: 2 * h, MaxTime: 4 * h},
2473+
Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}},
2474+
},
2475+
block4hto6hUlid: {
2476+
BlockMeta: tsdb.BlockMeta{ULID: block4hto6hUlid, MinTime: 4 * h, MaxTime: 6 * h},
2477+
Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}},
2478+
},
2479+
block6hto8hUlid: {
2480+
BlockMeta: tsdb.BlockMeta{ULID: block6hto8hUlid, MinTime: 6 * h, MaxTime: 8 * h},
2481+
Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}},
2482+
},
2483+
block8hto10hUlid: {
2484+
BlockMeta: tsdb.BlockMeta{ULID: block8hto10hUlid, MinTime: 8 * h, MaxTime: 10 * h},
2485+
Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}},
2486+
},
2487+
block10hto12hUlid: {
2488+
BlockMeta: tsdb.BlockMeta{ULID: block10hto12hUlid, MinTime: 10 * h, MaxTime: 12 * h},
2489+
Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}},
2490+
},
2491+
block24hto36hUlid: {
2492+
BlockMeta: tsdb.BlockMeta{ULID: block24hto36hUlid, MinTime: 24 * h, MaxTime: 36 * h},
2493+
Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}},
2494+
},
2495+
block36hto48hUlid: {
2496+
BlockMeta: tsdb.BlockMeta{ULID: block36hto48hUlid, MinTime: 36 * h, MaxTime: 48 * h},
2497+
Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}},
2498+
},
2499+
}
2500+
2501+
compactorCfg := &Config{
2502+
BlockRanges: cortex_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour},
2503+
}
2504+
2505+
limits := &validation.Limits{}
2506+
overrides := validation.NewOverrides(*limits, nil)
2507+
2508+
rs := ring.ReplicationSet{
2509+
Instances: []ring.InstanceDesc{
2510+
{Addr: "test-addr"},
2511+
},
2512+
}
2513+
subring := &ring.RingMock{}
2514+
subring.On("GetAllHealthy", mock.Anything).Return(rs, nil)
2515+
2516+
r := &ring.RingMock{}
2517+
r.On("ShuffleShard", mock.Anything, mock.Anything).Return(subring, nil)
2518+
2519+
registerer := prometheus.NewPedanticRegistry()
2520+
blockVisitMarkerReadFailed := promauto.With(registerer).NewCounter(prometheus.CounterOpts{
2521+
Name: "cortex_compactor_block_visit_marker_read_failed",
2522+
})
2523+
blockVisitMarkerWriteFailed := promauto.With(registerer).NewCounter(prometheus.CounterOpts{
2524+
Name: "cortex_compactor_block_visit_marker_write_failed",
2525+
})
2526+
2527+
bkt := &bucket.ClientMock{}
2528+
bkt.MockUpload(mock.Anything, nil)
2529+
bkt.MockGet(mock.Anything, "", nil)
2530+
2531+
metrics := newCompactorMetrics(registerer)
2532+
2533+
noCompactFilter := func() map[ulid.ULID]*metadata.NoCompactMark {
2534+
return nil
2535+
}
2536+
2537+
ctx := t.Context()
2538+
g := NewShuffleShardingGrouper(
2539+
ctx,
2540+
nil,
2541+
objstore.WithNoopInstr(bkt),
2542+
false,
2543+
true,
2544+
nil,
2545+
metadata.NoneFunc,
2546+
metrics.getSyncerMetrics("test-user"),
2547+
metrics,
2548+
*compactorCfg,
2549+
r,
2550+
"test-addr",
2551+
"test-compactor",
2552+
overrides,
2553+
"test-user",
2554+
10,
2555+
3,
2556+
1, // concurrency = 1
2557+
5*time.Minute,
2558+
blockVisitMarkerReadFailed,
2559+
blockVisitMarkerWriteFailed,
2560+
noCompactFilter,
2561+
)
2562+
2563+
totalGroupsAcrossCalls := 0
2564+
for i := 0; i < 5; i++ {
2565+
result, err := g.Groups(blocks)
2566+
require.NoError(t, err)
2567+
totalGroupsAcrossCalls += len(result)
2568+
if len(result) == 0 {
2569+
break
2570+
}
2571+
}
2572+
2573+
require.Equal(t, 1, totalGroupsAcrossCalls,
2574+
"with concurrency=1, the grouper should return at most 1 group total across multiple Groups() calls, "+
2575+
"but got %d (bug: BucketCompactor loop causes cascading compactions at different levels)", totalGroupsAcrossCalls)
2576+
}

pkg/compactor/partition_compaction_grouper.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type PartitionCompactionGrouper struct {
4545
blockFilesConcurrency int
4646
blocksFetchConcurrency int
4747
compactionConcurrency int
48+
totalGroupsPlanned int
4849

4950
doRandomPick bool
5051

@@ -114,6 +115,11 @@ func NewPartitionCompactionGrouper(
114115

115116
// Groups function modified from https://github.com/cortexproject/cortex/pull/2616
116117
func (g *PartitionCompactionGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*compact.Group, err error) {
118+
remainingConcurrency := g.compactionConcurrency - g.totalGroupsPlanned
119+
if remainingConcurrency <= 0 {
120+
return nil, nil
121+
}
122+
117123
// Check if this compactor is on the subring.
118124
// If the compactor is not on the subring when using the userID as a identifier
119125
// no plans generated below will be owned by the compactor so we can just return an empty array
@@ -140,7 +146,8 @@ func (g *PartitionCompactionGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta)
140146
return nil, errors.Wrap(err, "unable to generate compaction jobs")
141147
}
142148

143-
pickedPartitionCompactionJobs := g.pickPartitionCompactionJob(partitionCompactionJobs)
149+
pickedPartitionCompactionJobs := g.pickPartitionCompactionJob(partitionCompactionJobs, remainingConcurrency)
150+
g.totalGroupsPlanned += len(pickedPartitionCompactionJobs)
144151

145152
return pickedPartitionCompactionJobs, nil
146153
}
@@ -622,7 +629,7 @@ func (g *PartitionCompactionGrouper) handleEmptyPartition(partitionedGroupInfo *
622629
return nil
623630
}
624631

625-
func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompactionJobs []*blocksGroupWithPartition) []*compact.Group {
632+
func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompactionJobs []*blocksGroupWithPartition, remainingConcurrency int) []*compact.Group {
626633
var outGroups []*compact.Group
627634
for _, partitionedGroup := range partitionCompactionJobs {
628635
groupHash := partitionedGroup.groupHash
@@ -697,7 +704,7 @@ func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompact
697704

698705
outGroups = append(outGroups, thanosGroup)
699706
level.Debug(partitionedGroupLogger).Log("msg", "added partition to compaction groups")
700-
if len(outGroups) >= g.compactionConcurrency {
707+
if len(outGroups) >= remainingConcurrency {
701708
break
702709
}
703710
}

pkg/compactor/partition_compaction_grouper_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2253,3 +2253,93 @@ type expectedCompactionJob struct {
22532253
rangeStart int64
22542254
rangeEnd int64
22552255
}
2256+
2257+
func TestPartitionCompactionGrouper_ShouldRespectCumulativeConcurrencyLimit(t *testing.T) {
2258+
block1 := ulid.MustNew(201, nil)
2259+
block2 := ulid.MustNew(202, nil)
2260+
block3 := ulid.MustNew(203, nil)
2261+
block4 := ulid.MustNew(204, nil)
2262+
2263+
userID := "test-user"
2264+
testCompactorID := "test-compactor"
2265+
2266+
blocks := map[ulid.ULID]*metadata.Meta{
2267+
block1: {
2268+
BlockMeta: tsdb.BlockMeta{ULID: block1, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}},
2269+
},
2270+
block2: {
2271+
BlockMeta: tsdb.BlockMeta{ULID: block2, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}},
2272+
},
2273+
block3: {
2274+
BlockMeta: tsdb.BlockMeta{ULID: block3, MinTime: 2 * H, MaxTime: 4 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}},
2275+
},
2276+
block4: {
2277+
BlockMeta: tsdb.BlockMeta{ULID: block4, MinTime: 2 * H, MaxTime: 4 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}},
2278+
},
2279+
}
2280+
2281+
compactorCfg := &Config{
2282+
BlockRanges: []time.Duration{2 * time.Hour, 12 * time.Hour, 24 * time.Hour},
2283+
}
2284+
2285+
limits := &validation.Limits{}
2286+
overrides := validation.NewOverrides(*limits, nil)
2287+
2288+
rs := ring.ReplicationSet{
2289+
Instances: []ring.InstanceDesc{
2290+
{Addr: "test-addr"},
2291+
},
2292+
}
2293+
subring := &ring.RingMock{}
2294+
subring.On("GetAllHealthy", mock.Anything).Return(rs, nil)
2295+
subring.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(rs, nil)
2296+
2297+
r := &ring.RingMock{}
2298+
r.On("ShuffleShard", mock.Anything, mock.Anything).Return(subring, nil)
2299+
2300+
registerer := prometheus.NewPedanticRegistry()
2301+
metrics := newCompactorMetrics(registerer)
2302+
2303+
noCompactFilter := func() map[ulid.ULID]*metadata.NoCompactMark {
2304+
return nil
2305+
}
2306+
2307+
bkt := &bucket.ClientMock{}
2308+
bkt.MockUpload(mock.Anything, nil)
2309+
bkt.MockGet(mock.Anything, "", nil)
2310+
bkt.MockIter(mock.Anything, nil, nil)
2311+
2312+
ctx := t.Context()
2313+
g := NewPartitionCompactionGrouper(
2314+
ctx,
2315+
nil,
2316+
objstore.WithNoopInstr(bkt),
2317+
false,
2318+
true,
2319+
nil,
2320+
metrics.getSyncerMetrics(userID),
2321+
metrics,
2322+
metadata.NoneFunc,
2323+
*compactorCfg,
2324+
r,
2325+
"test-addr",
2326+
testCompactorID,
2327+
overrides,
2328+
userID,
2329+
10,
2330+
3,
2331+
1, // concurrency = 1
2332+
false,
2333+
5*time.Minute,
2334+
noCompactFilter,
2335+
1,
2336+
)
2337+
2338+
result1, err := g.Groups(blocks)
2339+
require.NoError(t, err)
2340+
require.Len(t, result1, 1, "first Groups() call should return exactly 1 group")
2341+
2342+
result2, err := g.Groups(blocks)
2343+
require.NoError(t, err)
2344+
require.Len(t, result2, 0, "second Groups() call should return 0 groups when cumulative concurrency limit is reached")
2345+
}

pkg/compactor/shuffle_sharding_grouper.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type ShuffleShardingGrouper struct {
3838
blockFilesConcurrency int
3939
blocksFetchConcurrency int
4040
compactionConcurrency int
41+
totalGroupsPlanned int
4142

4243
ring ring.ReadRing
4344
ringLifecyclerAddr string
@@ -106,6 +107,11 @@ func NewShuffleShardingGrouper(
106107

107108
// Groups function modified from https://github.com/cortexproject/cortex/pull/2616
108109
func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*compact.Group, err error) {
110+
remainingConcurrency := g.compactionConcurrency - g.totalGroupsPlanned
111+
if remainingConcurrency <= 0 {
112+
return nil, nil
113+
}
114+
109115
noCompactMarked := g.noCompBlocksFunc()
110116
// First of all we have to group blocks using the Thanos default
111117
// grouping (based on downsample resolution + external labels).
@@ -248,11 +254,12 @@ mainLoop:
248254
}
249255

250256
outGroups = append(outGroups, thanosGroup)
251-
if len(outGroups) == g.compactionConcurrency {
257+
if len(outGroups) == remainingConcurrency {
252258
break mainLoop
253259
}
254260
}
255261

262+
g.totalGroupsPlanned += len(outGroups)
256263
level.Info(g.logger).Log("msg", fmt.Sprintf("total groups for compaction: %d", len(outGroups)))
257264

258265
return outGroups, nil

0 commit comments

Comments
 (0)