fix: harden ingesting autoscalers around task-count boundaries#19269
fix: harden ingesting autoscalers around task-count boundaries#19269Fly-Style merged 6 commits intoapache:masterfrom
Conversation
taskCount from ioConfig for scale action instead of activeTaskGroups
taskCount from ioConfig for scale action instead of activeTaskGroupstaskCount from ioConfig for scale action instead of activeTaskGroups
4977473 to
746c760
Compare
This comment was marked as outdated.
This comment was marked as outdated.
taskCount from ioConfig for scale action instead of activeTaskGroups|
|
||
| // If task count is out of bounds, scale to the configured boundary | ||
| // regardless of optimal task count, to get back to a safe state. | ||
| if (isScaleActionAllowed() && isTaskCountOutOfBounds) { |
There was a problem hiding this comment.
Do we want to respect this isScaleActionAllowed if we're violating min/max task count bounds?
There was a problem hiding this comment.
That's a tricky thing, but my take here is -- eventually we will scale (and by eventually I mean - within minTriggerScaleActionFrequencyMillis ms), and it might be harmful to scale immediately.
I don't have a strong opinion here, I am open to remove isScaleActionAllowed() from the condition.
There was a problem hiding this comment.
I think we can leave this as-is for now, maybe add a comment explaining the decision?
| || currentTaskCount > config.getTaskCountMax(); | ||
| if (isTaskCountOutOfBounds) { | ||
| currentTaskCount = Math.min(config.getTaskCountMax(), | ||
| Math.max(config.getTaskCountMin(), supervisor.getIoConfig().getTaskCount())); |
There was a problem hiding this comment.
Good catch! It is cleaner.
|
|
||
| final int result = autoScaler.computeTaskCountForScaleAction(); | ||
|
|
||
| Assert.assertEquals( |
There was a problem hiding this comment.
nit: Either mock computeOptimalTaskCount to return a value different from the clamped value (e.g. mock it to return -1 or taskCountMin - 1) and assert the boundary is returned, or use verify(autoScaler, never()).computeOptimalTaskCount(any()) to confirm the early-return path was taken.
| * @return Integer, target number of tasksCount. -1 means skip scale action. | ||
| */ | ||
| private int computeDesiredTaskCount(List<Long> lags) | ||
| int computeDesiredTaskCount(List<Long> lags) |
There was a problem hiding this comment.
Let's mark with the proper @VisibleForTests annotation
| // regardless of optimal task count, to get back to a safe state. | ||
| if (isScaleActionAllowed() && isTaskCountOutOfBounds) { | ||
| taskCount = currentTaskCount; | ||
| log.info("Task count for supervisor[%s] was out of bounds [%d,%d], scaling.", supervisorId, config.getTaskCountMin(), config.getTaskCountMax()); |
There was a problem hiding this comment.
Don't we want to set: lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();?
| ); | ||
|
|
||
| int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount(); | ||
| int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount(); |
There was a problem hiding this comment.
We donot need to change this no?
We can still reference the activeTaskGroupCount and start clamping things below ?
There was a problem hiding this comment.
I am still sure that is the correct approach - autoscaler explicitly changes taskCount and operates with different task counts configurations.
Anyway, my aim of changing this was not only the change itself, but also to born discussion.
@jtuglu1 WDYT regarding changing that?
There was a problem hiding this comment.
I am still sure that is the correct approach - autoscaler explicitly changes taskCount and operates with different task counts configurations.
I'm fine with this switch.
I'm generally for using more static static configs when doing any kind of algorithm as it helps reduce chance of races, increases determinism/debuggability, etc. I would imagine in some cases there can be lots of skew between supervisor.getActiveTaskGroupsCount() and supervisor.getIoConfig().getTaskCount(), for example during roll-over. In other words, I'm fine with reading a potentially slightly stale value as long as it's generally consistent with the current world view.
| SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION, | ||
| "Already at min task count" | ||
| ) | ||
| .setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, taskCount)); |
There was a problem hiding this comment.
I think we should try and move this min/max hit emission logic out of a specific auto-scaler implementation, and into the generic DynamicScaleActionNotice logic. All an auto-scaler implementation should do is tell what task count it recommends for a given supervisor. The supervisor has min/max task count configs, and should IMO attempt to apply the auto-scaler recommendation. This way, we don't need to keep worrying about maintaining observability parity between the auto-scaler implementations.
There was a problem hiding this comment.
File an issue for that please, personally I do not have a time right now to resolve that issue, but that absolutely makes sense!
There was a problem hiding this comment.
In that case, can we emit metrics on the cost-based auto-scaler when we are capped by min/max? To keep consistent with lag-based auto-scaler? These metrics are useful for operators who may have automated mechanisms to go and, say, bump the max task count.
| Configs.valueOrDefault(taskCount, autoScalerConfig.getTaskCountMin()) | ||
| ); | ||
| } else if (isAutoScalerAvailable) { | ||
| this.taskCount = taskCount != null ? taskCount : autoScalerConfig.getTaskCountMin(); |
There was a problem hiding this comment.
nit: Configs.valueOrDefault
| if (isScaleActionAllowed() && isTaskCountOutOfBounds) { | ||
| taskCount = currentTaskCount; | ||
| lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis(); | ||
| log.info("Task count for supervisor[%s] was out of bounds [%d,%d], scaling.", supervisorId, config.getTaskCountMin(), config.getTaskCountMax()); |
There was a problem hiding this comment.
nit: let's include what we're scaling to. The other messages do.
|
|
||
| // Take the current task count but clamp it to the configured boundaries if it is outside the boundaries. | ||
| // There might be a configuration instance with a handwritten taskCount that is outside the boundaries. | ||
| final boolean isTaskCountOutOfBounds = currentTaskCount < config.getTaskCountMin() |
There was a problem hiding this comment.
Do we want to make sure we are not bounding if task count count min, say, if we don't have metrics temporarily (return -1 in computeOptimalTaskCount). Right now, I think we can trigger this condition if we're not checking for a non-negative.
There was a problem hiding this comment.
Basically, in the code path when you're not having metrics you won't trigger it. See:
You compute final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
and touch this value outside code path when
if (isScaleActionAllowed() && isTaskCountOutOfBounds) condition was checked.
If this ^^^ condition works, the rest of the code path includes returning taskCount from the method.
There was a problem hiding this comment.
Will we still emit metrics for hitting minimum in that case?
Consider optimalTaskCount == -1 (no metrics case) and we cannot scale due to isScaleActionAllowed() == false. Wouldn't we hit this line? IMO, it would be cleaner to handle the -1 case separately.
There was a problem hiding this comment.
Ah, fair, but why I did it: we're emitting min/max with scalingSkipReason dimension ... so it explicitly means 'no scaling was performed'.
There was a problem hiding this comment.
This is kind of the reason I want to move this min/max check stuff into the shared code at some point so implementors don't need to care about this stuff (just emit the task count and let supervisor logic handle it). But, for now, I think we can just handle this -1 case separately.
There was a problem hiding this comment.
Ah, fair, but why I did it: we're emitting min/max with scalingSkipReason dimension ... so it explicitly means 'no scaling was performed'.
Yes but the metric is a different meaning here. IMO, we should not be emitting a metric if we explicitly don't want to scale/don't want to make a decision on scaling. The metric being emitted means we wanted to scale up/down to some value that's outside the bounds of our spec. The value being -1 implies something else.
There was a problem hiding this comment.
The metric being emitted means we wanted to scale up/down to some value that's outside the bounds of our spec.
That won't happen in the case of CBA, the algorithm has hard limitation of picking potential values only within bounds.
There was a problem hiding this comment.
Ok, nevertheless, I'd still prefer to be defensive here and not assume anything about the future behavior of the algorithm or its outputs. IMO treating the scaling algorithm as a blackbox allows us to be more safe and provides a correctness layer against other bugs.
There was a problem hiding this comment.
LGTM after CI passes. I think the comments RE not making assumptions about what the algorithm will return will make it easier to port the min/max bounds checking + metric emission logic to a shared piece of code later on (as well as defend against bugs/incorrect assumptions). They also help with readability (either assumptions explicitly documented or logical guards against stuff like -1 help the reader understand the code).
This PR:
taskCountfromioConfigfor scale action calculations instead ofactiveTaskGroups;taskCountoutside the allowed bounds. For both cost-based and lag-based autoscalers, if the currenttaskCountis belowtaskCountMinor abovetaskCountMax, the scaler now returns the nearest valid boundary instead of using the out-of-range value as the scaling baseline. This keeps supervisors within configured limits and avoids inconsistent scaling decisions.This PR has: