Skip to content

Commit 9a5fe91

Browse files
committed
Fix #35404: stop ECS tasks on shutdown within timeout
1 parent 501a1de commit 9a5fe91

9 files changed

Lines changed: 139 additions & 33 deletions

File tree

src/main/java/eu/openanalytics/containerproxy/backend/AbstractContainerBackend.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import java.nio.file.Files;
5151
import java.nio.file.Path;
5252
import java.nio.file.Paths;
53+
import java.util.Collection;
5354
import java.util.HashMap;
5455
import java.util.Map;
5556
import java.util.Properties;
@@ -148,6 +149,17 @@ public void stopProxy(Proxy proxy) throws ContainerProxyException {
148149
}
149150
}
150151

152+
@Override
153+
public void stopProxies(Collection<Proxy> proxies) {
154+
for (Proxy proxy : proxies) {
155+
try {
156+
stopProxy(proxy);
157+
} catch (Exception exception) {
158+
log.error("Error while stopping proxy", exception);
159+
}
160+
}
161+
}
162+
151163
protected abstract void doStopProxy(Proxy proxy) throws Exception;
152164

153165
@Override

src/main/java/eu/openanalytics/containerproxy/backend/IContainerBackend.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import java.io.OutputStream;
3434
import java.net.URI;
35+
import java.util.Collection;
3536
import java.util.List;
3637
import java.util.Map;
3738
import java.util.function.BiConsumer;
@@ -59,6 +60,13 @@ public interface IContainerBackend {
5960
*/
6061
void stopProxy(Proxy proxy) throws ContainerProxyException;
6162

63+
/**
64+
* Stops the given proxies. Might use different mechanism to stop the proxies.
65+
*
66+
* @param proxies proxies to stop
67+
*/
68+
void stopProxies(Collection<Proxy> proxies);
69+
6270
default void pauseProxy(Proxy proxy) {
6371
throw new IllegalStateException("PauseProxy not supported by backend");
6472
}

src/main/java/eu/openanalytics/containerproxy/backend/dispatcher/DefaultProxyDispatcher.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.springframework.security.core.Authentication;
3333
import org.springframework.stereotype.Component;
3434

35+
import java.util.Collection;
36+
3537
@Component
3638
public class DefaultProxyDispatcher implements IProxyDispatcher {
3739

@@ -54,6 +56,11 @@ public void stopProxy(Proxy proxy, ProxyStopReason proxyStopReason) throws Conta
5456
containerBackend.stopProxy(proxy);
5557
}
5658

59+
@Override
60+
public void stopProxies(Collection<Proxy> proxies) {
61+
containerBackend.stopProxies(proxies);
62+
}
63+
5764
@Override
5865
public void pauseProxy(Proxy proxy) {
5966
containerBackend.pauseProxy(proxy);

src/main/java/eu/openanalytics/containerproxy/backend/dispatcher/IProxyDispatcher.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import eu.openanalytics.containerproxy.model.spec.ProxySpec;
2929
import org.springframework.security.core.Authentication;
3030

31+
import java.util.Collection;
32+
3133
public interface IProxyDispatcher {
3234

3335
Proxy startProxy(Authentication user, Proxy proxy, ProxySpec spec, ProxyStartupLog.ProxyStartupLogBuilder proxyStartupLogBuilder) throws ProxyFailedToStartException;
@@ -38,6 +40,8 @@ default void stopProxy(Proxy proxy) throws ContainerProxyException {
3840
stopProxy(proxy, ProxyStopReason.Unknown);
3941
}
4042

43+
void stopProxies(Collection<Proxy> proxies);
44+
4145
void pauseProxy(Proxy proxy);
4246

4347
Proxy resumeProxy(Authentication user, Proxy proxy, ProxySpec proxySpec) throws ProxyFailedToStartException;
@@ -51,4 +55,5 @@ default void stopProxy(Proxy proxy) throws ContainerProxyException {
5155
boolean isProxyHealthy(Proxy proxy);
5256

5357
boolean isProxyHealthySupported();
58+
5459
}

src/main/java/eu/openanalytics/containerproxy/backend/dispatcher/proxysharing/ProxySharingDispatcher.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import javax.inject.Inject;
5757
import java.time.Duration;
5858
import java.time.LocalDateTime;
59+
import java.util.Collection;
5960
import java.util.Objects;
6061
import java.util.UUID;
6162
import java.util.concurrent.CancellationException;
@@ -195,6 +196,17 @@ public void stopProxy(Proxy proxy, ProxyStopReason proxyStopReason) throws Conta
195196
cancelPendingDelegateProxy(proxy.getId());
196197
}
197198

199+
@Override
200+
public void stopProxies(Collection<Proxy> proxies) {
201+
for (Proxy proxy : proxies) {
202+
try {
203+
stopProxy(proxy);
204+
} catch (Exception e) {
205+
logger.error("Error while stopping proxy", e);
206+
}
207+
}
208+
}
209+
198210
@Override
199211
public void pauseProxy(Proxy proxy) {
200212
throw new IllegalStateException("ProxySharingDispatcher does not support pauseProxy.");

src/main/java/eu/openanalytics/containerproxy/backend/dispatcher/proxysharing/ProxySharingScaler.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,10 @@ private void markAllDelegateProxiesForRemoval() {
318318
}
319319

320320
private void reconcile() {
321+
if (executor.isShutdown() || executor.isTerminated()) {
322+
// see #35403
323+
return;
324+
}
321325
long numPendingSeats = getNumPendingSeats();
322326
long num = seatStore.getNumUnclaimedSeats() + numPendingSeats - pendingDelegatingProxies.size();
323327
debug(String.format("Status: %s, Unclaimed: %s + PendingDelegate: %s - PendingDelegating: %s = %s -> minimum: %s",
@@ -480,7 +484,7 @@ private Runnable createDelegateProxyJob(DelegateProxy originalDelegateProxy) {
480484
} catch (Throwable t2) {
481485
// log error, but ignore it otherwise
482486
// most important is that we remove the proxy from memory
483-
logError(originalDelegateProxy, t, "Error while stopping failed DelegateProxy");
487+
logError(originalDelegateProxy, t2, "Error while stopping failed DelegateProxy");
484488
}
485489
// remove seats and other data + trigger reconcile
486490
globalEventLoop.schedule(() -> markDelegateProxyForRemoval(id));
@@ -494,7 +498,7 @@ private Runnable createDelegateProxyJob(DelegateProxy originalDelegateProxy) {
494498
} catch (Throwable t2) {
495499
// log error, but ignore it otherwise
496500
// most important is that we remove the proxy from memory
497-
logError(originalDelegateProxy, t, "Error while stopping failed DelegateProxy");
501+
logError(originalDelegateProxy, t2, "Error while stopping failed DelegateProxy");
498502
}
499503
}
500504
// remove seats and other data + trigger reconcile
@@ -646,14 +650,19 @@ public Long getNumClaimedSeats() {
646650
}
647651

648652
public void stopAll() {
649-
executor.shutdown();
650653
try {
651-
executor.awaitTermination(3, TimeUnit.MINUTES);
652-
} catch (InterruptedException e) {
653-
throw new RuntimeException(e);
654-
}
655-
for (DelegateProxy delegateProxy : delegateProxyStore.getAllDelegateProxies()) {
656-
containerBackend.stopProxy(delegateProxy.getProxy());
654+
executor.shutdown();
655+
try {
656+
executor.awaitTermination(10, TimeUnit.SECONDS);
657+
executor.shutdownNow();
658+
executor.awaitTermination(120, TimeUnit.SECONDS);
659+
} catch (InterruptedException e) {
660+
// ignore
661+
}
662+
logger.info("Stopping {} delegateProxies", delegateProxyStore.getAllDelegateProxies().size());
663+
containerBackend.stopProxies(delegateProxyStore.getAllDelegateProxies().stream().map(DelegateProxy::getProxy).toList());
664+
} catch (Exception e) {
665+
logger.error("Error while stopping all delegateProxies", e);
657666
}
658667
}
659668

src/main/java/eu/openanalytics/containerproxy/backend/ecs/EcsBackend.java

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.commons.lang3.StringUtils;
4444
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
4545
import org.springframework.data.util.Pair;
46+
import org.springframework.scheduling.annotation.Scheduled;
4647
import org.springframework.security.core.Authentication;
4748
import org.springframework.stereotype.Component;
4849
import software.amazon.awssdk.regions.Region;
@@ -71,19 +72,22 @@
7172
import software.amazon.awssdk.services.ecs.model.Tag;
7273
import software.amazon.awssdk.services.ecs.model.Task;
7374
import software.amazon.awssdk.services.ecs.model.Volume;
75+
import software.amazon.awssdk.services.sts.StsClient;
7476

7577
import javax.annotation.PostConstruct;
7678
import javax.inject.Inject;
7779
import java.io.IOException;
7880
import java.net.URI;
7981
import java.util.ArrayList;
8082
import java.util.Arrays;
83+
import java.util.Collection;
8184
import java.util.HashMap;
8285
import java.util.List;
8386
import java.util.Map;
8487
import java.util.Objects;
8588
import java.util.Optional;
8689
import java.util.UUID;
90+
import java.util.concurrent.TimeUnit;
8791
import java.util.regex.Pattern;
8892
import java.util.stream.Stream;
8993

@@ -100,6 +104,7 @@ public class EcsBackend extends AbstractContainerBackend {
100104
private static final List<RuntimeValueKey<?>> IGNORED_RUNTIME_VALUES = Arrays.asList(PortMappingsKey.inst, UserGroupsKey.inst);
101105
private static final List<String> STARTING_STATES = List.of("PROVISIONING", "PENDING", "ACTIVATING");
102106
private static final List<String> STOPPING_STATES = List.of("DEACTIVATING", "STOPPING", "DEPROVISIONING", "STOPPED", "DELETED");
107+
private static final Tag TO_DELETE_TAG = Tag.builder().key("openanalytics.eu/sp-to-delete").value("true").build();
103108

104109
private EcsClient ecsClient;
105110
private Boolean enableCloudWatch;
@@ -111,6 +116,8 @@ public class EcsBackend extends AbstractContainerBackend {
111116
private int totalWaitMs;
112117
private String cluster;
113118
private String defaultRepositoryCredentialsParameter;
119+
private String region;
120+
private String accountId;
114121

115122
@Inject
116123
private IProxySpecProvider proxySpecProvider;
@@ -120,10 +127,13 @@ public class EcsBackend extends AbstractContainerBackend {
120127
public void initialize() {
121128
super.initialize();
122129

123-
String region = getProperty(PROPERTY_REGION);
130+
region = getProperty(PROPERTY_REGION);
124131
if (region == null) {
125132
throw new IllegalStateException("Error in configuration of ECS backend: proxy.ecs.region not set");
126133
}
134+
try (StsClient stsClient = StsClient.create()) {
135+
accountId = stsClient.getCallerIdentity().account();
136+
}
127137

128138
ecsClient = EcsClient.builder()
129139
.region(Region.of(region))
@@ -256,21 +266,24 @@ public Proxy startContainer(Authentication user, Container initialContainer, Con
256266
}, totalWaitMs, "ECS Task", 10, proxy, slog)) serviceReady = true;
257267
else serviceReady = false;
258268

269+
if (!serviceReady) {
270+
throw new ContainerFailedToStartException("Service failed to start", null, rContainerBuilder.build());
271+
}
272+
259273
proxyStartupLogBuilder.containerStarted(initialContainer.getIndex());
260274

261275
String image = ecsClient.describeTasks(builder -> builder.cluster(cluster).tasks(taskArn)).tasks().getFirst().containers().getFirst().image();
262276
rContainerBuilder.addRuntimeValue(new RuntimeValue(ContainerImageKey.inst, image), false);
263277

264-
if (!serviceReady) {
265-
throw new ContainerFailedToStartException("Service failed to start", null, rContainerBuilder.build());
266-
}
267-
268278
Map<Integer, Integer> portBindings = new HashMap<>();
269279
Container rContainer = rContainerBuilder.build();
270280
Map<String, URI> targets = setupPortMappingExistingProxy(proxy, rContainer, portBindings);
271281
return proxy.toBuilder().addTargets(targets).updateContainer(rContainer).build();
272282
} catch (ContainerFailedToStartException t) {
273283
throw t;
284+
} catch (InterruptedException interruptedException) {
285+
Thread.currentThread().interrupt();
286+
throw new ContainerFailedToStartException("ECS container failed to start", interruptedException, rContainerBuilder.build());
274287
} catch (Throwable throwable) {
275288
throw new ContainerFailedToStartException("ECS container failed to start", throwable, rContainerBuilder.build());
276289
}
@@ -433,6 +446,10 @@ private List<Secret> getSecrets(EcsSpecExtension specExtension) {
433446

434447
@Override
435448
protected void doStopProxy(Proxy proxy) {
449+
if (Thread.currentThread().isInterrupted()) {
450+
// use stopProxies on shutdown of ShinyProxy
451+
return;
452+
}
436453
for (Container container : proxy.getContainers()) {
437454
String taskArn = container.getRuntimeValue(BackendContainerNameKey.inst);
438455
ecsClient.stopTask(builder -> builder.cluster(cluster).task(taskArn));
@@ -449,18 +466,45 @@ protected void doStopProxy(Proxy proxy) {
449466
.cluster(cluster)
450467
.tasks(taskArn));
451468

452-
if (response.hasTasks() && !STOPPING_STATES.contains(response.tasks().getFirst().lastStatus())) {
469+
if (response.hasTasks() && !STOPPING_STATES.contains(response.tasks().getFirst().desiredStatus())) {
453470
return Retrying.FAILURE;
454471
}
455472
}
456473
return Retrying.SUCCESS;
457-
}, totalWaitMs, "Stopping ECS Task", 10, proxy, slog);
474+
}, totalWaitMs, "Stopping ECS Task", 0, proxy, slog);
458475

459476
if (!isInactive) {
460477
slog.warn(proxy, "Container did not get into stopping state");
461478
}
462479
}
463480

481+
@Override
482+
public void stopProxies(Collection<Proxy> proxies) {
483+
// ECS has strict rate limits, on shutdown we don't have enough time to stop all task and proxies
484+
// see https://docs.aws.amazon.com/AmazonECS/latest/APIReference/request-throttling.html
485+
// therefore we stop all tasks (rate limit = 40/s * 120s = 4800 tasks) and tag the TaskDefinitions with a tag (rate limit = 10/s)
486+
String taskDefinitionArnPrefix = String.format("arn:aws:ecs:%s:%s:task-definition/sp-task-definition-", region, accountId);
487+
String taskDefinitionArnSuffix = ":1";
488+
for (Proxy proxy : proxies) {
489+
for (Container container : proxy.getContainers()) {
490+
String taskArn = container.getRuntimeValue(BackendContainerNameKey.inst);
491+
try {
492+
ecsClient.stopTask(builder -> builder.cluster(cluster).task(taskArn));
493+
} catch (Exception e) {
494+
log.warn("Error stopping task: ", e);
495+
}
496+
try {
497+
ecsClient.tagResource(builder -> builder
498+
.resourceArn(taskDefinitionArnPrefix + proxy.getId() + taskDefinitionArnSuffix)
499+
.tags(TO_DELETE_TAG)
500+
);
501+
} catch (Exception e) {
502+
log.warn("Error tagging task definition: ", e);
503+
}
504+
}
505+
}
506+
}
507+
464508
@Override
465509
protected String getPropertyPrefix() {
466510
return PROPERTY_PREFIX;

src/main/java/eu/openanalytics/containerproxy/service/ProxyService.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import eu.openanalytics.containerproxy.ContainerProxyException;
2525
import eu.openanalytics.containerproxy.ProxyFailedToStartException;
2626
import eu.openanalytics.containerproxy.ProxyStartValidationException;
27+
import eu.openanalytics.containerproxy.backend.dispatcher.IProxyDispatcher;
2728
import eu.openanalytics.containerproxy.backend.dispatcher.ProxyDispatcherService;
2829
import eu.openanalytics.containerproxy.backend.strategy.IProxyTestStrategy;
2930
import eu.openanalytics.containerproxy.event.ProxyPauseEvent;
@@ -64,11 +65,13 @@
6465
import java.time.Instant;
6566
import java.util.ArrayList;
6667
import java.util.Arrays;
68+
import java.util.Collection;
6769
import java.util.HashSet;
6870
import java.util.List;
6971
import java.util.Map;
7072
import java.util.Set;
7173
import java.util.UUID;
74+
import java.util.stream.Collectors;
7275
import java.util.stream.Stream;
7376

7477
/**
@@ -132,9 +135,13 @@ public void shutdown() {
132135
if (!stopAppsOnShutdown) {
133136
return;
134137
}
135-
for (Proxy proxy : proxyStore.getAllProxies()) {
138+
// group proxies by dispatcher
139+
Collection<Proxy> proxies = proxyStore.getAllProxies();
140+
Map<IProxyDispatcher, List<Proxy>> groups = proxies.stream().collect(Collectors.groupingBy(p -> proxyDispatcherService.getDispatcher(p.getSpecId())));
141+
for (var group : groups.entrySet()) {
136142
try {
137-
proxyDispatcherService.getDispatcher(proxy.getSpecId()).stopProxy(proxy);
143+
// stop proxies in group
144+
group.getKey().stopProxies(group.getValue());
138145
} catch (Exception exception) {
139146
log.error("Error during shutdown", exception);
140147
}
@@ -552,14 +559,14 @@ private Proxy startOrResumeProxy(Authentication user, Proxy proxy, ProxyStartupL
552559
} catch (Throwable t2) {
553560
// log error, but ignore it otherwise
554561
// most important is that we remove the proxy from memory
555-
slog.warn(t.getProxy(), t, "Error while collecting logs of failed proxy");
562+
slog.warn(t.getProxy(), t2, "Error while collecting logs of failed proxy");
556563
}
557564
try {
558565
proxyDispatcherService.getDispatcher(spec.getId()).stopProxy(t.getProxy());
559566
} catch (Throwable t2) {
560567
// log error, but ignore it otherwise
561568
// most important is that we remove the proxy from memory
562-
slog.warn(t.getProxy(), t, "Error while stopping failed proxy");
569+
slog.warn(t.getProxy(), t2, "Error while stopping failed proxy");
563570
}
564571
proxyStore.removeProxy(t.getProxy());
565572
applicationEventPublisher.publishEvent(new ProxyStartFailedEvent(t.getProxy()));

0 commit comments

Comments
 (0)