-
Notifications
You must be signed in to change notification settings - Fork 3.8k
fix: harden ingesting autoscalers around task-count boundaries #19269
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
746c760
b385bf7
b88fe7b
b65ca37
131b1ee
decb63c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -174,11 +174,26 @@ public int computeTaskCountForScaleAction() | |
| lastKnownMetrics = collectMetrics(); | ||
|
|
||
| final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics); | ||
| final int currentTaskCount = supervisor.getIoConfig().getTaskCount(); | ||
| int currentTaskCount = supervisor.getIoConfig().getTaskCount(); | ||
|
|
||
| // Take the current task count but clamp it to the configured boundaries if it is outside the boundaries. | ||
| // There might be a configuration instance with a handwritten taskCount that is outside the boundaries. | ||
| final boolean isTaskCountOutOfBounds = currentTaskCount < config.getTaskCountMin() | ||
| || currentTaskCount > config.getTaskCountMax(); | ||
| if (isTaskCountOutOfBounds) { | ||
| currentTaskCount = Math.min(config.getTaskCountMax(), | ||
| Math.max(config.getTaskCountMin(), supervisor.getIoConfig().getTaskCount())); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: use
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch! It is cleaner. |
||
| } | ||
|
|
||
| // Perform scale-up actions; scale-down actions only if configured. | ||
| final int taskCount; | ||
| if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) { | ||
|
|
||
| // If task count is out of bounds, scale to the configured boundary | ||
| // regardless of optimal task count, to get back to a safe state. | ||
| if (isScaleActionAllowed() && isTaskCountOutOfBounds) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to respect this
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a tricky thing, but my take here is -- eventually we will scale (and by eventually I mean - within I don't have a strong opinion here, I am open to remove
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can leave this as-is for now, maybe add a comment explaining the decision? |
||
| taskCount = currentTaskCount; | ||
| log.info("Task count for supervisor[%s] was out of bounds [%d,%d], scaling.", supervisorId, config.getTaskCountMin(), config.getTaskCountMax()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't we want to set:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure!
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: let's include what we're scaling to. The other messages do. |
||
| } else if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) { | ||
| taskCount = optimalTaskCount; | ||
| lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis(); | ||
| log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale up).", supervisorId, currentTaskCount, taskCount); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -212,7 +212,7 @@ private Runnable computeAndCollectLag() | |
| * @param lags the lag metrics of Stream (Kafka/Kinesis) | ||
| * @return Integer, target number of tasksCount. -1 means skip scale action. | ||
| */ | ||
| private int computeDesiredTaskCount(List<Long> lags) | ||
| int computeDesiredTaskCount(List<Long> lags) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's mark with the proper @VisibleForTests annotation |
||
| { | ||
| // if the supervisor is not suspended, ensure required tasks are running | ||
| // if suspended, ensure tasks have been requested to gracefully stop | ||
|
|
@@ -239,19 +239,30 @@ private int computeDesiredTaskCount(List<Long> lags) | |
| withinProportion, spec.getId() | ||
| ); | ||
|
|
||
| int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount(); | ||
| int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We donot need to change this no?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am still sure that is the correct approach - autoscaler explicitly changes Anyway, my aim of changing this was not only the change itself, but also to born discussion.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I'm fine with this switch. I'm generally for using more static static configs when doing any kind of algorithm as it helps reduce chance of races, increases determinism/debuggability, etc. I would imagine in some cases there can be lots of skew between |
||
| int desiredActiveTaskCount; | ||
| int partitionCount = supervisor.getPartitionCount(); | ||
| final int partitionCount = supervisor.getPartitionCount(); | ||
| if (partitionCount <= 0) { | ||
| log.warn("Partition number for supervisor[%s] <= 0 ? how can it be?", spec.getId()); | ||
| return -1; | ||
| } | ||
|
|
||
| final int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount); | ||
| final int actualTaskCountMin = Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount); | ||
|
|
||
| // Take the current task count but clamp it to the configured boundaries if it is outside the boundaries. | ||
| // There might be a configuration instance with a handwritten taskCount that is outside the boundaries. | ||
| // If that is happening, take the bound and return early. | ||
| final boolean isTaskCountOutOfBounds = currentActiveTaskCount < actualTaskCountMin | ||
| || currentActiveTaskCount > actualTaskCountMax; | ||
| if (isTaskCountOutOfBounds) { | ||
| currentActiveTaskCount = Math.min(actualTaskCountMax, Math.max(actualTaskCountMin, currentActiveTaskCount)); | ||
| return currentActiveTaskCount; | ||
| } | ||
|
|
||
| if (beyondProportion >= lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) { | ||
| // Do Scale out | ||
| int taskCount = currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep(); | ||
|
|
||
| int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount); | ||
| final int taskCount = currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep(); | ||
| if (currentActiveTaskCount == actualTaskCountMax) { | ||
| log.debug( | ||
| "CurrentActiveTaskCount reached task count Max limit, skipping scale out action for supervisor[%s].", | ||
|
|
@@ -272,8 +283,7 @@ private int computeDesiredTaskCount(List<Long> lags) | |
|
|
||
| if (withinProportion >= lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) { | ||
| // Do Scale in | ||
| int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep(); | ||
| int actualTaskCountMin = Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount); | ||
| final int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep(); | ||
| if (currentActiveTaskCount == actualTaskCountMin) { | ||
| log.debug( | ||
| "CurrentActiveTaskCount reached task count Min limit[%d], skipping scale in action for supervisor[%s].", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -232,6 +232,56 @@ public void testScaleUpFromMinimumTasks() | |
| ); | ||
| } | ||
|
|
||
| @Test | ||
| public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin() | ||
| { | ||
| CostBasedAutoScalerConfig boundedConfig = CostBasedAutoScalerConfig.builder() | ||
| .taskCountMax(100) | ||
| .taskCountMin(50) | ||
| .enableTaskAutoScaler(true) | ||
| .build(); | ||
| CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter)); | ||
|
|
||
| final int configuredTaskCount = 1; | ||
| final int expectedTaskCount = 50; | ||
|
|
||
| doReturn(expectedTaskCount).when(autoScaler).computeOptimalTaskCount(any()); | ||
| setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 1000.0, 0.2); | ||
|
|
||
| final int result = autoScaler.computeTaskCountForScaleAction(); | ||
|
|
||
| Assert.assertEquals( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Either mock computeOptimalTaskCount to return a value different from the clamped value (e.g. mock it to return -1 or taskCountMin - 1) and assert the boundary is returned, or use verify(autoScaler, never()).computeOptimalTaskCount(any()) to confirm the early-return path was taken. |
||
| "Should scale to taskCountMin when the configured task count is below the minimum boundary", | ||
| expectedTaskCount, | ||
| result | ||
| ); | ||
| } | ||
|
|
||
| @Test | ||
| public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax() | ||
| { | ||
| CostBasedAutoScalerConfig boundedConfig = CostBasedAutoScalerConfig.builder() | ||
| .taskCountMax(50) | ||
| .taskCountMin(1) | ||
| .enableTaskAutoScaler(true) | ||
| .build(); | ||
| CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter)); | ||
|
|
||
| final int configuredTaskCount = 100; | ||
| final int expectedTaskCount = 50; | ||
|
|
||
| doReturn(expectedTaskCount).when(autoScaler).computeOptimalTaskCount(any()); | ||
| setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 10.0, 0.8); | ||
|
|
||
| final int result = autoScaler.computeTaskCountForScaleAction(); | ||
|
|
||
| Assert.assertEquals( | ||
| "Should scale to taskCountMax when the configured task count is above the maximum boundary", | ||
| expectedTaskCount, | ||
| result | ||
| ); | ||
| } | ||
|
|
||
| @Test | ||
| public void testScaleUpToMaximumTasks() | ||
| { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,121 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; | ||
|
|
||
| import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; | ||
| import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; | ||
| import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; | ||
| import org.apache.druid.java.util.emitter.service.ServiceEmitter; | ||
| import org.junit.Assert; | ||
| import org.junit.Before; | ||
| import org.junit.Test; | ||
| import org.mockito.Mockito; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
|
|
||
| import static org.mockito.Mockito.when; | ||
|
|
||
|
|
||
| public class LagBasedAutoScalerTest | ||
| { | ||
| private static final int PARTITION_COUNT = 100; | ||
|
|
||
| private SupervisorSpec mockSpec; | ||
| private SeekableStreamSupervisor mockSupervisor; | ||
| private SeekableStreamSupervisorIOConfig mockIoConfig; | ||
| private ServiceEmitter mockEmitter; | ||
| private LagBasedAutoScalerConfig config; | ||
|
|
||
| @Before | ||
| public void setUp() | ||
| { | ||
| mockSpec = Mockito.mock(SupervisorSpec.class); | ||
| mockSupervisor = Mockito.mock(SeekableStreamSupervisor.class); | ||
| mockEmitter = Mockito.mock(ServiceEmitter.class); | ||
| mockIoConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class); | ||
|
|
||
| when(mockSpec.getId()).thenReturn("test-supervisor"); | ||
| when(mockSpec.getDataSources()).thenReturn(List.of("test-datasource")); | ||
| when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig); | ||
| when(mockIoConfig.getStream()).thenReturn("test-stream"); | ||
|
|
||
| config = new LagBasedAutoScalerConfig( | ||
| 30_000L, // lagCollectionIntervalMillis | ||
| 300_000L, // lagCollectionRangeMillis | ||
| 300_000L, // scaleActionStartDelayMillis | ||
| 60_000L, // scaleActionPeriodMillis | ||
| 2_000_000L, | ||
| 300_000L, | ||
| 0.7, | ||
| 0.9, | ||
| 100, | ||
| null, // taskCountStart | ||
| 50, | ||
| 1, // scaleInStep | ||
| 4, | ||
| true, // enableTaskAutoScaler | ||
| 6_000_000L, // minTriggerScaleActionFrequencyMillis | ||
| null, // lagAggregate | ||
| null // stopTaskCountRatio | ||
| ); | ||
| } | ||
|
|
||
| /** | ||
| * Verifies that scale-out uses the configured task count as the baseline. | ||
| */ | ||
| @Test | ||
| public void testScaleOutDoesNotReturnCountBelowTaskCountMin() | ||
| { | ||
| when(mockIoConfig.getTaskCount()).thenReturn(50); | ||
| when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT); | ||
|
|
||
| Assert.assertEquals(54, createAutoScaler().computeDesiredTaskCount(createLagSamples(2_000_001L))); | ||
| } | ||
|
|
||
| @Test | ||
| public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin() | ||
| { | ||
| when(mockIoConfig.getTaskCount()).thenReturn(1); | ||
| when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT); | ||
|
|
||
| Assert.assertEquals(50, createAutoScaler().computeDesiredTaskCount(createLagSamples(2_000_001L))); | ||
| } | ||
|
|
||
| @Test | ||
| public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax() | ||
| { | ||
| when(mockIoConfig.getTaskCount()).thenReturn(101); | ||
| when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT); | ||
|
|
||
| Assert.assertEquals(100, createAutoScaler().computeDesiredTaskCount(createLagSamples(299_999L))); | ||
| } | ||
|
|
||
| private LagBasedAutoScaler createAutoScaler() | ||
| { | ||
| return new LagBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter); | ||
| } | ||
|
|
||
| private List<Long> createLagSamples(long lag) | ||
| { | ||
| return new ArrayList<>(Collections.nCopies(11, lag)); | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to make sure we are not bounding if task count count min, say, if we don't have metrics temporarily (return -1 in computeOptimalTaskCount). Right now, I think we can trigger this condition if we're not checking for a non-negative.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, in the code path when you're not having metrics you won't trigger it. See:
You compute
final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);and touch this value outside code path when
if (isScaleActionAllowed() && isTaskCountOutOfBounds)condition was checked.If this ^^^ condition works, the rest of the code path includes returning
taskCountfrom the method.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we still emit metrics for hitting minimum in that case?
Consider
optimalTaskCount == -1(no metrics case) and we cannot scale due toisScaleActionAllowed() == false. Wouldn't we hit this line? IMO, it would be cleaner to handle the -1 case separately.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, fair, but why I did it: we're emitting min/max with
scalingSkipReasondimension ... so it explicitly means 'no scaling was performed'.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kind of the reason I want to move this min/max check stuff into the shared code at some point so implementors don't need to care about this stuff (just emit the task count and let supervisor logic handle it). But, for now, I think we can just handle this -1 case separately.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes but the metric is a different meaning here. IMO, we should not be emitting a metric if we explicitly don't want to scale/don't want to make a decision on scaling. The metric being emitted means we wanted to scale up/down to some value that's outside the bounds of our spec. The value being
-1implies something else.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That won't happen in the case of CBA, the algorithm has hard limitation of picking potential values only within bounds.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, nevertheless, I'd still prefer to be defensive here and not assume anything about the future behavior of the algorithm or its outputs. IMO treating the scaling algorithm as a blackbox allows us to be more safe and provides a correctness layer against other bugs.