Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public static Stream<Arguments> getCompactionSupervisorTestParams()
}

@Test
@Timeout(20)
@Timeout(120)
public void test_ingest10kRows_ofSelfClusterMetrics_andVerifyValues()
{
final int maxRowsPerSegment = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,26 @@ public int computeTaskCountForScaleAction()
lastKnownMetrics = collectMetrics();

final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
int currentTaskCount = supervisor.getIoConfig().getTaskCount();

// Take the current task count but clamp it to the configured boundaries if it is outside the boundaries.
// There might be a configuration instance with a handwritten taskCount that is outside the boundaries.
final boolean isTaskCountOutOfBounds = currentTaskCount < config.getTaskCountMin()
Copy link
Copy Markdown
Contributor

@jtuglu1 jtuglu1 Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to make sure we are not bounding if task count count min, say, if we don't have metrics temporarily (return -1 in computeOptimalTaskCount). Right now, I think we can trigger this condition if we're not checking for a non-negative.

Copy link
Copy Markdown
Contributor Author

@Fly-Style Fly-Style Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, in the code path when you're not having metrics you won't trigger it. See:

You compute final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
and touch this value outside code path when
if (isScaleActionAllowed() && isTaskCountOutOfBounds) condition was checked.

If this ^^^ condition works, the rest of the code path includes returning taskCount from the method.

Copy link
Copy Markdown
Contributor

@jtuglu1 jtuglu1 Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we still emit metrics for hitting minimum in that case?

Consider optimalTaskCount == -1 (no metrics case) and we cannot scale due to isScaleActionAllowed() == false. Wouldn't we hit this line? IMO, it would be cleaner to handle the -1 case separately.

Copy link
Copy Markdown
Contributor Author

@Fly-Style Fly-Style Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, fair, but why I did it: we're emitting min/max with scalingSkipReason dimension ... so it explicitly means 'no scaling was performed'.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of the reason I want to move this min/max check stuff into the shared code at some point so implementors don't need to care about this stuff (just emit the task count and let supervisor logic handle it). But, for now, I think we can just handle this -1 case separately.

Copy link
Copy Markdown
Contributor

@jtuglu1 jtuglu1 Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, fair, but why I did it: we're emitting min/max with scalingSkipReason dimension ... so it explicitly means 'no scaling was performed'.

Yes but the metric is a different meaning here. IMO, we should not be emitting a metric if we explicitly don't want to scale/don't want to make a decision on scaling. The metric being emitted means we wanted to scale up/down to some value that's outside the bounds of our spec. The value being -1 implies something else.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metric being emitted means we wanted to scale up/down to some value that's outside the bounds of our spec.

That won't happen in the case of CBA, the algorithm has hard limitation of picking potential values only within bounds.

Copy link
Copy Markdown
Contributor

@jtuglu1 jtuglu1 Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, nevertheless, I'd still prefer to be defensive here and not assume anything about the future behavior of the algorithm or its outputs. IMO treating the scaling algorithm as a blackbox allows us to be more safe and provides a correctness layer against other bugs.

|| currentTaskCount > config.getTaskCountMax();
if (isTaskCountOutOfBounds) {
currentTaskCount = Math.min(config.getTaskCountMax(),
Math.max(config.getTaskCountMin(), supervisor.getIoConfig().getTaskCount()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use currentTaskCount

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! It is cleaner.

}

// Perform scale-up actions; scale-down actions only if configured.
final int taskCount;
if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {

// If task count is out of bounds, scale to the configured boundary
// regardless of optimal task count, to get back to a safe state.
if (isScaleActionAllowed() && isTaskCountOutOfBounds) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to respect this isScaleActionAllowed if we're violating min/max task count bounds?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a tricky thing, but my take here is -- eventually we will scale (and by eventually I mean - within minTriggerScaleActionFrequencyMillis ms), and it might be harmful to scale immediately.

I don't have a strong opinion here, I am open to remove isScaleActionAllowed() from the condition.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can leave this as-is for now, maybe add a comment explaining the decision?

taskCount = currentTaskCount;
log.info("Task count for supervisor[%s] was out of bounds [%d,%d], scaling.", supervisorId, config.getTaskCountMin(), config.getTaskCountMax());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to set: lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's include what we're scaling to. The other messages do.

} else if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {
taskCount = optimalTaskCount;
lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale up).", supervisorId, currentTaskCount, taskCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private Runnable computeAndCollectLag()
* @param lags the lag metrics of Stream (Kafka/Kinesis)
* @return Integer, target number of tasksCount. -1 means skip scale action.
*/
private int computeDesiredTaskCount(List<Long> lags)
int computeDesiredTaskCount(List<Long> lags)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's mark with the proper @VisibleForTests annotation

{
// if the supervisor is not suspended, ensure required tasks are running
// if suspended, ensure tasks have been requested to gracefully stop
Expand All @@ -239,19 +239,30 @@ private int computeDesiredTaskCount(List<Long> lags)
withinProportion, spec.getId()
);

int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We donot need to change this no?
We can still reference the activeTaskGroupCount and start clamping things below ?

Copy link
Copy Markdown
Contributor Author

@Fly-Style Fly-Style Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still sure that is the correct approach - autoscaler explicitly changes taskCount and operates with different task counts configurations.

Anyway, my aim of changing this was not only the change itself, but also to born discussion.
@jtuglu1 WDYT regarding changing that?

Copy link
Copy Markdown
Contributor

@jtuglu1 jtuglu1 Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still sure that is the correct approach - autoscaler explicitly changes taskCount and operates with different task counts configurations.

I'm fine with this switch.

I'm generally for using more static static configs when doing any kind of algorithm as it helps reduce chance of races, increases determinism/debuggability, etc. I would imagine in some cases there can be lots of skew between supervisor.getActiveTaskGroupsCount() and supervisor.getIoConfig().getTaskCount(), for example during roll-over. In other words, I'm fine with reading a potentially slightly stale value as long as it's generally consistent with the current world view.

int desiredActiveTaskCount;
int partitionCount = supervisor.getPartitionCount();
final int partitionCount = supervisor.getPartitionCount();
if (partitionCount <= 0) {
log.warn("Partition number for supervisor[%s] <= 0 ? how can it be?", spec.getId());
return -1;
}

final int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
final int actualTaskCountMin = Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);

// Take the current task count but clamp it to the configured boundaries if it is outside the boundaries.
// There might be a configuration instance with a handwritten taskCount that is outside the boundaries.
// If that is happening, take the bound and return early.
final boolean isTaskCountOutOfBounds = currentActiveTaskCount < actualTaskCountMin
|| currentActiveTaskCount > actualTaskCountMax;
if (isTaskCountOutOfBounds) {
currentActiveTaskCount = Math.min(actualTaskCountMax, Math.max(actualTaskCountMin, currentActiveTaskCount));
return currentActiveTaskCount;
}

if (beyondProportion >= lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
// Do Scale out
int taskCount = currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep();

int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
final int taskCount = currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep();
if (currentActiveTaskCount == actualTaskCountMax) {
log.debug(
"CurrentActiveTaskCount reached task count Max limit, skipping scale out action for supervisor[%s].",
Expand All @@ -272,8 +283,7 @@ private int computeDesiredTaskCount(List<Long> lags)

if (withinProportion >= lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
// Do Scale in
int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep();
int actualTaskCountMin = Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
final int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep();
if (currentActiveTaskCount == actualTaskCountMin) {
log.debug(
"CurrentActiveTaskCount reached task count Min limit[%d], skipping scale in action for supervisor[%s].",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc
Assert.assertEquals(1, taskCountBeforeScaleOut);
Thread.sleep(1000);
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountAfterScaleOut);
Assert.assertEquals(3, taskCountAfterScaleOut);
Assert.assertTrue(
dynamicActionEmitter
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
Expand Down Expand Up @@ -470,14 +470,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOutAlreadyAtMax() throws In
EasyMock.replay(taskMaster);

StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10)
{
@Override
public int getActiveTaskGroupsCount()
{
return 2;
}
};
TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10);

LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
Expand All @@ -488,6 +481,7 @@ public int getActiveTaskGroupsCount()
spec,
dynamicActionEmitter
);
supervisor.getIoConfig().setTaskCount(2);
supervisor.start();
autoScaler.start();
supervisor.runInternal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,56 @@ public void testScaleUpFromMinimumTasks()
);
}

@Test
public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin()
{
CostBasedAutoScalerConfig boundedConfig = CostBasedAutoScalerConfig.builder()
.taskCountMax(100)
.taskCountMin(50)
.enableTaskAutoScaler(true)
.build();
CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));

final int configuredTaskCount = 1;
final int expectedTaskCount = 50;

doReturn(expectedTaskCount).when(autoScaler).computeOptimalTaskCount(any());
setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 1000.0, 0.2);

final int result = autoScaler.computeTaskCountForScaleAction();

Assert.assertEquals(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Either mock computeOptimalTaskCount to return a value different from the clamped value (e.g. mock it to return -1 or taskCountMin - 1) and assert the boundary is returned, or use verify(autoScaler, never()).computeOptimalTaskCount(any()) to confirm the early-return path was taken.

"Should scale to taskCountMin when the configured task count is below the minimum boundary",
expectedTaskCount,
result
);
}

@Test
public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax()
{
CostBasedAutoScalerConfig boundedConfig = CostBasedAutoScalerConfig.builder()
.taskCountMax(50)
.taskCountMin(1)
.enableTaskAutoScaler(true)
.build();
CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));

final int configuredTaskCount = 100;
final int expectedTaskCount = 50;

doReturn(expectedTaskCount).when(autoScaler).computeOptimalTaskCount(any());
setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 10.0, 0.8);

final int result = autoScaler.computeTaskCountForScaleAction();

Assert.assertEquals(
"Should scale to taskCountMax when the configured task count is above the maximum boundary",
expectedTaskCount,
result
);
}

@Test
public void testScaleUpToMaximumTasks()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ public void testScalingActionSkippedWhenMovingAverageRateUnavailable()
when(supervisor.getIoConfig()).thenReturn(ioConfig);
when(ioConfig.getStream()).thenReturn("test-stream");
when(ioConfig.getTaskDuration()).thenReturn(Duration.standardHours(1));
when(ioConfig.getTaskCount()).thenReturn(1);
when(supervisor.computeLagStats()).thenReturn(new LagStats(100, 100, 100));
// No task stats means the moving average rate is unavailable
when(supervisor.getStats()).thenReturn(Collections.emptyMap());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;

import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.mockito.Mockito.when;


public class LagBasedAutoScalerTest
{
private static final int PARTITION_COUNT = 100;

private SupervisorSpec mockSpec;
private SeekableStreamSupervisor mockSupervisor;
private SeekableStreamSupervisorIOConfig mockIoConfig;
private ServiceEmitter mockEmitter;
private LagBasedAutoScalerConfig config;

@Before
public void setUp()
{
mockSpec = Mockito.mock(SupervisorSpec.class);
mockSupervisor = Mockito.mock(SeekableStreamSupervisor.class);
mockEmitter = Mockito.mock(ServiceEmitter.class);
mockIoConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class);

when(mockSpec.getId()).thenReturn("test-supervisor");
when(mockSpec.getDataSources()).thenReturn(List.of("test-datasource"));
when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
when(mockIoConfig.getStream()).thenReturn("test-stream");

config = new LagBasedAutoScalerConfig(
30_000L, // lagCollectionIntervalMillis
300_000L, // lagCollectionRangeMillis
300_000L, // scaleActionStartDelayMillis
60_000L, // scaleActionPeriodMillis
2_000_000L,
300_000L,
0.7,
0.9,
100,
null, // taskCountStart
50,
1, // scaleInStep
4,
true, // enableTaskAutoScaler
6_000_000L, // minTriggerScaleActionFrequencyMillis
null, // lagAggregate
null // stopTaskCountRatio
);
}

/**
* Verifies that scale-out uses the configured task count as the baseline.
*/
@Test
public void testScaleOutDoesNotReturnCountBelowTaskCountMin()
{
when(mockIoConfig.getTaskCount()).thenReturn(50);
when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT);

Assert.assertEquals(54, createAutoScaler().computeDesiredTaskCount(createLagSamples(2_000_001L)));
}

@Test
public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin()
{
when(mockIoConfig.getTaskCount()).thenReturn(1);
when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT);

Assert.assertEquals(50, createAutoScaler().computeDesiredTaskCount(createLagSamples(2_000_001L)));
}

@Test
public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax()
{
when(mockIoConfig.getTaskCount()).thenReturn(101);
when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT);

Assert.assertEquals(100, createAutoScaler().computeDesiredTaskCount(createLagSamples(299_999L)));
}

private LagBasedAutoScaler createAutoScaler()
{
return new LagBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter);
}

private List<Long> createLagSamples(long lag)
{
return new ArrayList<>(Collections.nCopies(11, lag));
}
}
Loading