Skip to content

Commit 4977473

Browse files
committed
bug: use taskCount from ioConfig for scale action instead of activeTaskGroups
1 parent 0a4b772 commit 4977473

2 files changed

Lines changed: 5 additions & 11 deletions

File tree

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ private Runnable computeAndCollectLag()
212212
* @param lags the lag metrics of Stream (Kafka/Kinesis)
213213
* @return Integer, target number of tasksCount. -1 means skip scale action.
214214
*/
215-
private int computeDesiredTaskCount(List<Long> lags)
215+
int computeDesiredTaskCount(List<Long> lags)
216216
{
217217
// if the supervisor is not suspended, ensure required tasks are running
218218
// if suspended, ensure tasks have been requested to gracefully stop
@@ -239,7 +239,7 @@ private int computeDesiredTaskCount(List<Long> lags)
239239
withinProportion, spec.getId()
240240
);
241241

242-
int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
242+
int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount();
243243
int desiredActiveTaskCount;
244244
int partitionCount = supervisor.getPartitionCount();
245245
if (partitionCount <= 0) {

indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc
434434
Assert.assertEquals(1, taskCountBeforeScaleOut);
435435
Thread.sleep(1000);
436436
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
437-
Assert.assertEquals(2, taskCountAfterScaleOut);
437+
Assert.assertEquals(3, taskCountAfterScaleOut);
438438
Assert.assertTrue(
439439
dynamicActionEmitter
440440
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
@@ -470,14 +470,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOutAlreadyAtMax() throws In
470470
EasyMock.replay(taskMaster);
471471

472472
StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
473-
TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10)
474-
{
475-
@Override
476-
public int getActiveTaskGroupsCount()
477-
{
478-
return 2;
479-
}
480-
};
473+
TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10);
481474

482475
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
483476
supervisor,
@@ -488,6 +481,7 @@ public int getActiveTaskGroupsCount()
488481
spec,
489482
dynamicActionEmitter
490483
);
484+
supervisor.getIoConfig().setTaskCount(2);
491485
supervisor.start();
492486
autoScaler.start();
493487
supervisor.runInternal();

0 commit comments

Comments
 (0)