Skip to content

Commit 5f37b7b

Browse files
authored
fix: harden ingesting autoscalers around task-count boundaries (#19269)
1 parent e4f3008 commit 5f37b7b

8 files changed

Lines changed: 331 additions & 39 deletions

File tree

embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public static Stream<Arguments> getCompactionSupervisorTestParams()
153153
}
154154

155155
@Test
156-
@Timeout(20)
156+
@Timeout(120)
157157
public void test_ingest10kRows_ofSelfClusterMetrics_andVerifyValues()
158158
{
159159
final int maxRowsPerSegment = 1000;

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,15 +88,18 @@ public SeekableStreamSupervisorIOConfig(
8888
this.lagAggregator = lagAggregator;
8989
// Could be null
9090
this.autoScalerConfig = autoScalerConfig;
91-
this.autoScalerEnabled = autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler();
91+
boolean isAutoScalerAvailable = autoScalerConfig != null;
92+
this.autoScalerEnabled = isAutoScalerAvailable && autoScalerConfig.getEnableTaskAutoScaler();
9293
if (autoScalerEnabled) {
9394
// Priority: taskCountStart > taskCount > taskCountMin
9495
this.taskCount = Configs.valueOrDefault(
9596
autoScalerConfig.getTaskCountStart(),
9697
Configs.valueOrDefault(taskCount, autoScalerConfig.getTaskCountMin())
9798
);
99+
} else if (isAutoScalerAvailable) {
100+
this.taskCount = Configs.valueOrDefault(taskCount, autoScalerConfig.getTaskCountMin());
98101
} else {
99-
this.taskCount = taskCount != null ? taskCount : 1;
102+
this.taskCount = Configs.valueOrDefault(taskCount, 1);
100103
}
101104
Preconditions.checkArgument(stopTaskCount == null || stopTaskCount > 0,
102105
"stopTaskCount must be greater than 0");

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,11 +174,27 @@ public int computeTaskCountForScaleAction()
174174
lastKnownMetrics = collectMetrics();
175175

176176
final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
177-
final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
177+
int currentTaskCount = supervisor.getIoConfig().getTaskCount();
178+
179+
// Take the current task count but clamp it to the configured boundaries if it is outside the boundaries.
180+
// There might be a configuration instance with a handwritten taskCount that is outside the boundaries.
181+
final boolean isTaskCountOutOfBounds = currentTaskCount < config.getTaskCountMin()
182+
|| currentTaskCount > config.getTaskCountMax();
183+
if (isTaskCountOutOfBounds) {
184+
currentTaskCount = Math.min(config.getTaskCountMax(), Math.max(config.getTaskCountMin(), currentTaskCount));
185+
}
178186

179187
// Perform scale-up actions; scale-down actions only if configured.
180188
final int taskCount;
181-
if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {
189+
190+
// If task count is out of bounds, scale to the configured boundary
191+
// regardless of optimal task count, to get back to a safe state.
192+
if (isScaleActionAllowed() && isTaskCountOutOfBounds) {
193+
taskCount = currentTaskCount;
194+
lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
195+
log.info("Task count for supervisor[%s] was out of bounds [%d,%d], urgently scaling from [%d] to [%d].",
196+
supervisorId, config.getTaskCountMin(), config.getTaskCountMax(), currentTaskCount, currentTaskCount);
197+
} else if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {
182198
taskCount = optimalTaskCount;
183199
lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
184200
log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale up).", supervisorId, currentTaskCount, taskCount);
@@ -192,6 +208,24 @@ && isScaleActionAllowed()
192208
} else {
193209
taskCount = -1;
194210
log.debug("No scaling required for supervisor[%s]", supervisorId);
211+
212+
// Emit metrics for scaling skip reasons; in case of min == max, signaling reaching
213+
// max task count has bigger priority for the external observers / trackers
214+
if (optimalTaskCount >= config.getTaskCountMax() || currentTaskCount == config.getTaskCountMax()) {
215+
emitter.emit(getMetricBuilder()
216+
.setDimension(
217+
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
218+
"Already at max task count"
219+
)
220+
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, currentTaskCount));
221+
} else if (optimalTaskCount == config.getTaskCountMin() || currentTaskCount == config.getTaskCountMin()) {
222+
emitter.emit(getMetricBuilder()
223+
.setDimension(
224+
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
225+
"Already at min task count"
226+
)
227+
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, currentTaskCount));
228+
}
195229
}
196230
return taskCount;
197231
}

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
2121

22+
import com.google.common.annotations.VisibleForTesting;
2223
import org.apache.commons.collections4.queue.CircularFifoQueue;
2324
import org.apache.druid.error.DruidException;
2425
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
@@ -212,7 +213,8 @@ private Runnable computeAndCollectLag()
212213
* @param lags the lag metrics of Stream (Kafka/Kinesis)
213214
* @return Integer, target number of tasksCount. -1 means skip scale action.
214215
*/
215-
private int computeDesiredTaskCount(List<Long> lags)
216+
@VisibleForTesting
217+
int computeDesiredTaskCount(List<Long> lags)
216218
{
217219
// if the supervisor is not suspended, ensure required tasks are running
218220
// if suspended, ensure tasks have been requested to gracefully stop
@@ -239,19 +241,30 @@ private int computeDesiredTaskCount(List<Long> lags)
239241
withinProportion, spec.getId()
240242
);
241243

242-
int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
244+
int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount();
243245
int desiredActiveTaskCount;
244-
int partitionCount = supervisor.getPartitionCount();
246+
final int partitionCount = supervisor.getPartitionCount();
245247
if (partitionCount <= 0) {
246248
log.warn("Partition number for supervisor[%s] <= 0 ? how can it be?", spec.getId());
247249
return -1;
248250
}
249251

252+
final int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
253+
final int actualTaskCountMin = Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
254+
255+
// Take the current task count but clamp it to the configured boundaries if it is outside the boundaries.
256+
// There might be a configuration instance with a handwritten taskCount that is outside the boundaries.
257+
// If that is happening, take the bound and return early.
258+
final boolean isTaskCountOutOfBounds = currentActiveTaskCount < actualTaskCountMin
259+
|| currentActiveTaskCount > actualTaskCountMax;
260+
if (isTaskCountOutOfBounds) {
261+
currentActiveTaskCount = Math.min(actualTaskCountMax, Math.max(actualTaskCountMin, currentActiveTaskCount));
262+
return currentActiveTaskCount;
263+
}
264+
250265
if (beyondProportion >= lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
251266
// Do Scale out
252-
int taskCount = currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep();
253-
254-
int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
267+
final int taskCount = currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep();
255268
if (currentActiveTaskCount == actualTaskCountMax) {
256269
log.debug(
257270
"CurrentActiveTaskCount reached task count Max limit, skipping scale out action for supervisor[%s].",
@@ -272,8 +285,7 @@ private int computeDesiredTaskCount(List<Long> lags)
272285

273286
if (withinProportion >= lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
274287
// Do Scale in
275-
int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep();
276-
int actualTaskCountMin = Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
288+
final int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep();
277289
if (currentActiveTaskCount == actualTaskCountMin) {
278290
log.debug(
279291
"CurrentActiveTaskCount reached task count Min limit[%d], skipping scale in action for supervisor[%s].",

indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc
434434
Assert.assertEquals(1, taskCountBeforeScaleOut);
435435
Thread.sleep(1000);
436436
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
437-
Assert.assertEquals(2, taskCountAfterScaleOut);
437+
Assert.assertEquals(3, taskCountAfterScaleOut);
438438
Assert.assertTrue(
439439
dynamicActionEmitter
440440
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
@@ -470,14 +470,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOutAlreadyAtMax() throws In
470470
EasyMock.replay(taskMaster);
471471

472472
StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
473-
TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10)
474-
{
475-
@Override
476-
public int getActiveTaskGroupsCount()
477-
{
478-
return 2;
479-
}
480-
};
473+
TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10);
481474

482475
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
483476
supervisor,
@@ -488,6 +481,7 @@ public int getActiveTaskGroupsCount()
488481
spec,
489482
dynamicActionEmitter
490483
);
484+
supervisor.getIoConfig().setTaskCount(2);
491485
supervisor.start();
492486
autoScaler.start();
493487
supervisor.runInternal();

indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java

Lines changed: 144 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,22 @@
2323
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
2424
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
2525
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
26+
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
27+
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
2628
import org.joda.time.Duration;
2729
import org.junit.Assert;
2830
import org.junit.Before;
2931
import org.junit.Test;
32+
import org.mockito.ArgumentCaptor;
3033
import org.mockito.Mockito;
3134

35+
import java.util.List;
3236

3337
import static org.mockito.ArgumentMatchers.any;
3438
import static org.mockito.Mockito.doReturn;
3539
import static org.mockito.Mockito.mock;
3640
import static org.mockito.Mockito.spy;
41+
import static org.mockito.Mockito.verify;
3742
import static org.mockito.Mockito.when;
3843

3944
public class CostBasedAutoScalerMockTest
@@ -59,7 +64,7 @@ public void setUp()
5964
mockIoConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class);
6065

6166
when(mockSpec.getId()).thenReturn(SUPERVISOR_ID);
62-
when(mockSpec.getDataSources()).thenReturn(java.util.List.of("test-datasource"));
67+
when(mockSpec.getDataSources()).thenReturn(List.of("test-datasource"));
6368
when(mockSpec.isSuspended()).thenReturn(false);
6469
when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
6570
when(mockIoConfig.getStream()).thenReturn(STREAM_NAME);
@@ -232,6 +237,60 @@ public void testScaleUpFromMinimumTasks()
232237
);
233238
}
234239

240+
@Test
241+
public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin()
242+
{
243+
CostBasedAutoScalerConfig boundedConfig = CostBasedAutoScalerConfig.builder()
244+
.taskCountMax(100)
245+
.taskCountMin(50)
246+
.enableTaskAutoScaler(true)
247+
.build();
248+
CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
249+
250+
final int configuredTaskCount = 1;
251+
final int taskCountMin = 50;
252+
253+
// Mock computeOptimalTaskCount to return a value different from the boundary,
254+
// so the assertion proves the boundary clamping path was taken.
255+
doReturn(taskCountMin - 1).when(autoScaler).computeOptimalTaskCount(any());
256+
setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 1000.0, 0.2);
257+
258+
final int result = autoScaler.computeTaskCountForScaleAction();
259+
260+
Assert.assertEquals(
261+
"Should scale to taskCountMin when the configured task count is below the minimum boundary",
262+
taskCountMin,
263+
result
264+
);
265+
}
266+
267+
@Test
268+
public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax()
269+
{
270+
CostBasedAutoScalerConfig boundedConfig = CostBasedAutoScalerConfig.builder()
271+
.taskCountMax(50)
272+
.taskCountMin(1)
273+
.enableTaskAutoScaler(true)
274+
.build();
275+
CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
276+
277+
final int configuredTaskCount = 100;
278+
final int taskCountMax = 50;
279+
280+
// Mock computeOptimalTaskCount to return a value different from the boundary,
281+
// so the assertion proves the boundary clamping path was taken.
282+
doReturn(taskCountMax + 1).when(autoScaler).computeOptimalTaskCount(any());
283+
setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 10.0, 0.8);
284+
285+
final int result = autoScaler.computeTaskCountForScaleAction();
286+
287+
Assert.assertEquals(
288+
"Should scale to taskCountMax when the configured task count is above the maximum boundary",
289+
taskCountMax,
290+
result
291+
);
292+
}
293+
235294
@Test
236295
public void testScaleUpToMaximumTasks()
237296
{
@@ -357,6 +416,89 @@ public void testScaleDownAllowedDuringRolloverWhenScaleDownOnRolloverOnlyEnabled
357416
);
358417
}
359418

419+
@Test
420+
public void testEmitsMaxTaskCountSkipReasonWhenCurrentIsAtMax()
421+
{
422+
CostBasedAutoScalerConfig boundedConfig = CostBasedAutoScalerConfig.builder()
423+
.taskCountMax(10)
424+
.taskCountMin(1)
425+
.enableTaskAutoScaler(true)
426+
.build();
427+
CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
428+
429+
final int currentTaskCount = 10; // already at max
430+
doReturn(-1).when(autoScaler).computeOptimalTaskCount(any());
431+
setupMocksForMetricsCollection(autoScaler, currentTaskCount, 100.0, 0.5);
432+
433+
Assert.assertEquals(-1, autoScaler.computeTaskCountForScaleAction());
434+
435+
@SuppressWarnings("unchecked")
436+
ArgumentCaptor<ServiceEventBuilder<ServiceMetricEvent>> captor = ArgumentCaptor.forClass(ServiceEventBuilder.class);
437+
verify(mockEmitter).emit(captor.capture());
438+
Assert.assertEquals(
439+
"Should emit 'Already at max task count' skip reason when current task count is at maximum",
440+
"Already at max task count",
441+
((ServiceMetricEvent.Builder) captor.getValue())
442+
.getDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)
443+
);
444+
}
445+
446+
@Test
447+
public void testEmitsMinTaskCountSkipReasonWhenCurrentIsAtMin()
448+
{
449+
CostBasedAutoScalerConfig boundedConfig = CostBasedAutoScalerConfig.builder()
450+
.taskCountMax(100)
451+
.taskCountMin(10)
452+
.enableTaskAutoScaler(true)
453+
.build();
454+
CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
455+
456+
final int currentTaskCount = 10; // already at min
457+
doReturn(-1).when(autoScaler).computeOptimalTaskCount(any());
458+
setupMocksForMetricsCollection(autoScaler, currentTaskCount, 100.0, 0.5);
459+
460+
Assert.assertEquals(-1, autoScaler.computeTaskCountForScaleAction());
461+
462+
@SuppressWarnings("unchecked")
463+
ArgumentCaptor<ServiceEventBuilder<ServiceMetricEvent>> captor = ArgumentCaptor.forClass(ServiceEventBuilder.class);
464+
verify(mockEmitter).emit(captor.capture());
465+
Assert.assertEquals(
466+
"Should emit 'Already at min task count' skip reason when current task count is at minimum",
467+
"Already at min task count",
468+
((ServiceMetricEvent.Builder) captor.getValue())
469+
.getDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)
470+
);
471+
}
472+
473+
@Test
474+
public void testMaxSkipReasonTakesPriorityWhenMinEqualsMax()
475+
{
476+
// When min == max, current is simultaneously at both bounds.
477+
// The comment in the production code states that signaling max has higher priority.
478+
CostBasedAutoScalerConfig boundedConfig = CostBasedAutoScalerConfig.builder()
479+
.taskCountMax(5)
480+
.taskCountMin(5)
481+
.enableTaskAutoScaler(true)
482+
.build();
483+
CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
484+
485+
final int currentTaskCount = 5; // at both min and max
486+
doReturn(-1).when(autoScaler).computeOptimalTaskCount(any());
487+
setupMocksForMetricsCollection(autoScaler, currentTaskCount, 100.0, 0.5);
488+
489+
Assert.assertEquals(-1, autoScaler.computeTaskCountForScaleAction());
490+
491+
@SuppressWarnings("unchecked")
492+
ArgumentCaptor<ServiceEventBuilder<ServiceMetricEvent>> captor = ArgumentCaptor.forClass(ServiceEventBuilder.class);
493+
verify(mockEmitter).emit(captor.capture());
494+
Assert.assertEquals(
495+
"Max skip reason should take priority over min skip reason when min equals max",
496+
"Already at max task count",
497+
((ServiceMetricEvent.Builder) captor.getValue())
498+
.getDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)
499+
);
500+
}
501+
360502
private void setupMocksForMetricsCollection(
361503
CostBasedAutoScaler autoScaler,
362504
int taskCount,
@@ -377,22 +519,7 @@ private void setupMocksForMetricsCollection(
377519
SeekableStreamSupervisorIOConfig ioConfig = mock(SeekableStreamSupervisorIOConfig.class);
378520
doReturn(ioConfig).when(mockSupervisor).getIoConfig();
379521
doReturn(taskCount).when(ioConfig).getTaskCount();
522+
doReturn(STREAM_NAME).when(ioConfig).getStream();
380523
}
381524

382-
private CostMetrics createMetrics(
383-
double avgPartitionLag,
384-
int currentTaskCount,
385-
int partitionCount,
386-
double pollIdleRatio
387-
)
388-
{
389-
return new CostMetrics(
390-
avgPartitionLag,
391-
currentTaskCount,
392-
partitionCount,
393-
pollIdleRatio,
394-
TASK_DURATION_SECONDS,
395-
AVG_PROCESSING_RATE
396-
);
397-
}
398525
}

indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,7 @@ public void testScalingActionSkippedWhenMovingAverageRateUnavailable()
495495
when(supervisor.getIoConfig()).thenReturn(ioConfig);
496496
when(ioConfig.getStream()).thenReturn("test-stream");
497497
when(ioConfig.getTaskDuration()).thenReturn(Duration.standardHours(1));
498+
when(ioConfig.getTaskCount()).thenReturn(1);
498499
when(supervisor.computeLagStats()).thenReturn(new LagStats(100, 100, 100));
499500
// No task stats means the moving average rate is unavailable
500501
when(supervisor.getStats()).thenReturn(Collections.emptyMap());

0 commit comments

Comments
 (0)