Skip to content

Commit 7ffc46f

Browse files
committed
Ref #27090: parse k8s events + re-write retry to be faster
1 parent 173400a commit 7ffc46f

7 files changed

Lines changed: 248 additions & 78 deletions

File tree

src/main/java/eu/openanalytics/containerproxy/backend/docker/DockerSwarmBackend.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ protected Container startContainer(ContainerSpec spec, Proxy proxy) throws Excep
182182
container.getParameters().put(PARAM_SERVICE_ID, serviceId);
183183

184184
// Give the service some time to start up and launch a container.
185-
boolean containerFound = Retrying.retry(i -> {
185+
boolean containerFound = Retrying.retry((i, maxAttempts) -> {
186186
try {
187187
Task serviceTask = dockerClient
188188
.listTasks(Task.Criteria.builder().serviceName(serviceName).build())
@@ -194,7 +194,7 @@ protected Container startContainer(ContainerSpec spec, Proxy proxy) throws Excep
194194
throw new RuntimeException("Failed to inspect swarm service tasks", e);
195195
}
196196
return (container.getId() != null);
197-
}, 30, 2000, true);
197+
}, 60_000, true);
198198

199199
if (!containerFound) {
200200
dockerClient.removeService(serviceId);

src/main/java/eu/openanalytics/containerproxy/backend/kubernetes/KubernetesBackend.java

Lines changed: 70 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -297,58 +297,30 @@ protected Container startContainer(ContainerSpec spec, Proxy proxy) throws Excep
297297
Pod startedPod = kubeClient.pods().inNamespace(effectiveKubeNamespace).create(patchedPod);
298298

299299
int totalWaitMs = Integer.parseInt(environment.getProperty("proxy.kubernetes.pod-wait-time", "60000"));
300-
int maxTries = totalWaitMs / 1000;
301-
Retrying.retry(i -> {
302-
if (!Readiness.getInstance().isReady(kubeClient.resource(startedPod).fromServer().get())) {
303-
if (i > 1 && log != null) log.debug(String.format("Container not ready yet, trying again (%d/%d)", i, maxTries));
304-
return false;
305-
}
306-
return true;
300+
boolean podReady = Retrying.retry((currentAttempt, maxAttempts) -> {
301+
if (!Readiness.getInstance().isReady(kubeClient.resource(startedPod).fromServer().get())) {
302+
if (currentAttempt > 0 && log != null) log.debug(String.format("Container not ready yet, trying again (%d/%d)", currentAttempt, maxAttempts));
303+
return false;
307304
}
308-
, maxTries, 1000);
309-
if (!Readiness.getInstance().isReady(kubeClient.resource(startedPod).fromServer().get())) {
310-
Pod pod = kubeClient.resource(startedPod).fromServer().get();
311-
container.getParameters().put(PARAM_POD, pod);
312-
proxy.getContainers().add(container);
313-
314-
proxyStatusService.containerStartFailed(proxy, container);
315-
throw new ContainerProxyException("Container did not become ready in time");
316-
}
317-
proxyStatusService.containerStarted(proxy, container);
318-
Pod pod = kubeClient.resource(startedPod).fromServer().get();
319-
320-
// TODO check k8s compatibility
321-
List<Event> events = kubeClient.v1().events().withInvolvedObject(new ObjectReferenceBuilder()
322-
.withKind("Pod")
323-
.withName(pod.getMetadata().getName())
324-
.withNamespace(pod.getMetadata().getNamespace())
325-
.build()).list().getItems();
326-
327-
LocalDateTime pullingTime = null;
328-
LocalDateTime pulledTime = null;
329-
LocalDateTime scheduledTime = null;
330-
331-
for (Event event : events) {
332-
if (event.getCount() != null && event.getCount() > 1) {
333-
// ignore events which happened multiple time as we are unable to properly process them
334-
continue;
335-
}
336-
if (event.getReason().equalsIgnoreCase("Pulling")) {
337-
pullingTime = OffsetDateTime.parse(event.getLastTimestamp()).atZoneSameInstant(ZoneId.systemDefault()).toLocalDateTime();
338-
} else if (event.getReason().equalsIgnoreCase("Pulled")) {
339-
pulledTime = OffsetDateTime.parse(event.getLastTimestamp()).atZoneSameInstant(ZoneId.systemDefault()).toLocalDateTime();
340-
} else if (event.getReason().equalsIgnoreCase("Scheduled")) {
341-
scheduledTime = OffsetDateTime.parse(event.getEventTime().getTime()).atZoneSameInstant(ZoneId.systemDefault()).toLocalDateTime();
305+
return true;
306+
}, totalWaitMs);
307+
308+
if (!podReady) {
309+
// check a final time whether the pod is ready
310+
if (!Readiness.getInstance().isReady(kubeClient.resource(startedPod).fromServer().get())) {
311+
Pod pod = kubeClient.resource(startedPod).fromServer().get();
312+
container.getParameters().put(PARAM_POD, pod);
313+
proxy.getContainers().add(container);
314+
315+
proxyStatusService.containerStartFailed(proxy, container);
316+
throw new ContainerProxyException("Container did not become ready in time");
342317
}
343318
}
344319

345-
if (pullingTime != null && pulledTime != null) {
346-
proxyStatusService.imagePulled(proxy, container, pullingTime, pulledTime);
347-
}
320+
proxyStatusService.containerStarted(proxy, container);
321+
Pod pod = kubeClient.resource(startedPod).fromServer().get();
348322

349-
if (scheduledTime != null) {
350-
proxyStatusService.containerScheduled(proxy, container, scheduledTime);
351-
}
323+
parseKubernetesEvents(proxy, container, pod);
352324

353325
Service service = null;
354326
if (isUseInternalNetwork()) {
@@ -374,7 +346,7 @@ protected Container startContainer(ContainerSpec spec, Proxy proxy) throws Excep
374346
.build());
375347

376348
// Workaround: waitUntilReady appears to be buggy.
377-
Retrying.retry(i -> isServiceReady(kubeClient.resource(startupService).fromServer().get()), 60, 1000);
349+
Retrying.retry((currentAttempts, maxAttempts) -> isServiceReady(kubeClient.resource(startupService).fromServer().get()), 60_000);
378350

379351
service = kubeClient.resource(startupService).fromServer().get();
380352
}
@@ -400,6 +372,56 @@ protected Container startContainer(ContainerSpec spec, Proxy proxy) throws Excep
400372
return container;
401373
}
402374

375+
private LocalDateTime getEventTime(Event event) {
376+
if (event.getEventTime() != null && event.getEventTime().getTime() != null) {
377+
return OffsetDateTime.parse(event.getEventTime().getTime()).atZoneSameInstant(ZoneId.systemDefault()).toLocalDateTime();
378+
}
379+
380+
if (event.getFirstTimestamp() != null) {
381+
return OffsetDateTime.parse(event.getFirstTimestamp()).atZoneSameInstant(ZoneId.systemDefault()).toLocalDateTime();
382+
}
383+
384+
if (event.getLastTimestamp() != null) {
385+
return OffsetDateTime.parse(event.getLastTimestamp()).atZoneSameInstant(ZoneId.systemDefault()).toLocalDateTime();
386+
}
387+
388+
return null;
389+
}
390+
391+
private void parseKubernetesEvents(Proxy proxy, Container container, Pod pod) {
392+
List<Event> events = kubeClient.v1().events().withInvolvedObject(new ObjectReferenceBuilder()
393+
.withKind("Pod")
394+
.withName(pod.getMetadata().getName())
395+
.withNamespace(pod.getMetadata().getNamespace())
396+
.build()).list().getItems();
397+
398+
LocalDateTime pullingTime = null;
399+
LocalDateTime pulledTime = null;
400+
LocalDateTime scheduledTime = null;
401+
402+
for (Event event : events) {
403+
if (event.getCount() != null && event.getCount() > 1) {
404+
// ignore events which happened multiple time as we are unable to properly process them
405+
continue;
406+
}
407+
if (event.getReason().equalsIgnoreCase("Pulling")) {
408+
pullingTime = getEventTime(event);
409+
} else if (event.getReason().equalsIgnoreCase("Pulled")) {
410+
pulledTime = getEventTime(event);
411+
} else if (event.getReason().equalsIgnoreCase("Scheduled")) {
412+
scheduledTime = getEventTime(event);
413+
}
414+
}
415+
416+
if (pullingTime != null && pulledTime != null) {
417+
proxyStatusService.imagePulled(proxy, container, pullingTime, pulledTime);
418+
}
419+
420+
if (scheduledTime != null) {
421+
proxyStatusService.containerScheduled(proxy, container, scheduledTime);
422+
}
423+
}
424+
403425
private JsonPatch readPatchFromSpec(ContainerSpec containerSpec, Proxy proxy) throws JsonMappingException, JsonProcessingException {
404426
String patchAsString = proxy.getSpec().getKubernetesPodPatch();
405427
if (patchAsString == null) {

src/main/java/eu/openanalytics/containerproxy/model/runtime/ProxyStartupLog.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,23 @@
1+
/**
2+
* ContainerProxy
3+
*
4+
* Copyright (C) 2016-2021 Open Analytics
5+
*
6+
* ===========================================================================
7+
*
8+
* This program is free software: you can redistribute it and/or modify
9+
* it under the terms of the Apache License as published by
10+
* The Apache Software Foundation, either version 2 of the License, or
11+
* (at your option) any later version.
12+
*
13+
* This program is distributed in the hope that it will be useful,
14+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
15+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16+
* Apache License for more details.
17+
*
18+
* You should have received a copy of the Apache License
19+
* along with this program. If not, see <http://www.apache.org/licenses/>
20+
*/
121
package eu.openanalytics.containerproxy.model.runtime;
222

323
import java.time.Duration;

src/main/java/eu/openanalytics/containerproxy/service/AppRecoveryService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,15 +123,14 @@ public void recoverRunningApps() throws Exception {
123123
Proxy proxy = proxies.get(proxyId);
124124
Container container = new Container();
125125
container.setId(containerInfo.getContainerId());
126-
// TODO
127-
// container.setIndex(spec.getIndex());
128126
container.setParameters(containerInfo.getParameters());
129127
ContainerSpec containerSpec = proxy.getSpec().getContainerSpec(containerInfo.getImage());
130128
if (containerSpec == null) {
131129
log.warn(String.format("Found existing container (%s) but not corresponding container spec.", containerInfo.getContainerId()));
132130
continue;
133131
}
134132
container.setSpec(containerSpec);
133+
container.setIndex(containerSpec.getIndex());
135134
proxy.addContainer(container);
136135
proxy.setStatus(ProxyStatus.Up);
137136

src/main/java/eu/openanalytics/containerproxy/service/ProxyStatusService.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,23 @@
1+
/**
2+
* ContainerProxy
3+
*
4+
* Copyright (C) 2016-2021 Open Analytics
5+
*
6+
* ===========================================================================
7+
*
8+
* This program is free software: you can redistribute it and/or modify
9+
* it under the terms of the Apache License as published by
10+
* The Apache Software Foundation, either version 2 of the License, or
11+
* (at your option) any later version.
12+
*
13+
* This program is distributed in the hope that it will be useful,
14+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
15+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16+
* Apache License for more details.
17+
*
18+
* You should have received a copy of the Apache License
19+
* along with this program. If not, see <http://www.apache.org/licenses/>
20+
*/
121
package eu.openanalytics.containerproxy.service;
222

323
import eu.openanalytics.containerproxy.model.runtime.Container;

src/main/java/eu/openanalytics/containerproxy/util/Retrying.java

Lines changed: 55 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,32 +20,61 @@
2020
*/
2121
package eu.openanalytics.containerproxy.util;
2222

23-
import java.util.function.IntPredicate;
24-
2523
public class Retrying {
2624

27-
public static boolean retry(IntPredicate job, int tries, int waitTime) {
28-
return retry(job, tries, waitTime, false);
29-
}
30-
31-
public static boolean retry(IntPredicate job, int tries, int waitTime, boolean retryOnException) {
32-
boolean retVal = false;
33-
RuntimeException exception = null;
34-
for (int currentTry = 1; currentTry <= tries; currentTry++) {
35-
try {
36-
if (job.test(currentTry)) {
37-
retVal = true;
38-
exception = null;
39-
break;
40-
}
41-
} catch (RuntimeException e) {
42-
if (retryOnException) exception = e;
43-
else throw e;
44-
}
45-
try { Thread.sleep(waitTime); } catch (InterruptedException ignore) {}
46-
}
47-
if (exception == null) return retVal;
48-
else throw exception;
49-
50-
}
25+
@FunctionalInterface
26+
public interface Attempt {
27+
boolean attempt(int currentAttempt, int maxAttempts);
28+
}
29+
30+
public static boolean retry(Attempt job, int maxDelay) {
31+
return retry(job, maxDelay, false);
32+
}
33+
34+
public static boolean retry(Attempt job, int maxDelay, boolean retryOnException) {
35+
boolean retVal = false;
36+
RuntimeException exception = null;
37+
int maxAttempts = numberOfAttempts(maxDelay);
38+
for (int currentAttempt = 0; currentAttempt < maxAttempts; currentAttempt++) {
39+
delay(currentAttempt); // delay here so that we don't delay for the last iteration
40+
try {
41+
if (job.attempt(currentAttempt, maxAttempts)) {
42+
retVal = true;
43+
exception = null;
44+
break;
45+
}
46+
} catch (RuntimeException e) {
47+
if (retryOnException) exception = e;
48+
else throw e;
49+
}
50+
}
51+
if (exception == null) return retVal;
52+
else throw exception;
53+
}
54+
55+
public static void delay(Integer attempt) {
56+
try {
57+
if (attempt == 0) {
58+
} else if (attempt <= 5) {
59+
Thread.sleep(200);
60+
} else if (attempt <= 10) {
61+
Thread.sleep(400);
62+
} else {
63+
Thread.sleep(2_000);
64+
}
65+
} catch (InterruptedException ignore) {
66+
}
67+
}
68+
69+
public static int numberOfAttempts(Integer maxDelay) {
70+
if (maxDelay < 2_000) {
71+
throw new IllegalArgumentException("The maximum delay should at least be 2000ms");
72+
}
73+
// it takes 11 attempts to have a delay of 3 000ms
74+
return (int) Math.ceil((maxDelay - 3_000) / 2_000.0) + 11;
75+
}
76+
5177
}
78+
79+
80+
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/**
2+
* ContainerProxy
3+
*
4+
* Copyright (C) 2016-2021 Open Analytics
5+
*
6+
* ===========================================================================
7+
*
8+
* This program is free software: you can redistribute it and/or modify
9+
* it under the terms of the Apache License as published by
10+
* The Apache Software Foundation, either version 2 of the License, or
11+
* (at your option) any later version.
12+
*
13+
* This program is distributed in the hope that it will be useful,
14+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
15+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16+
* Apache License for more details.
17+
*
18+
* You should have received a copy of the Apache License
19+
* along with this program. If not, see <http://www.apache.org/licenses/>
20+
*/
21+
package eu.openanalytics.containerproxy.test.unit;
22+
23+
import eu.openanalytics.containerproxy.ContainerProxyApplication;
24+
import eu.openanalytics.containerproxy.test.proxy.PropertyOverrideContextInitializer;
25+
import eu.openanalytics.containerproxy.test.proxy.TestProxyService;
26+
import eu.openanalytics.containerproxy.util.Retrying;
27+
import org.junit.jupiter.api.Assertions;
28+
import org.junit.jupiter.api.Test;
29+
import org.junit.jupiter.api.extension.ExtendWith;
30+
import org.springframework.boot.test.context.SpringBootTest;
31+
import org.springframework.test.context.ActiveProfiles;
32+
import org.springframework.test.context.ContextConfiguration;
33+
import org.springframework.test.context.junit.jupiter.SpringExtension;
34+
35+
import java.time.Duration;
36+
import java.time.Instant;
37+
import java.util.concurrent.atomic.AtomicInteger;
38+
39+
@SpringBootTest(classes = {TestProxyService.TestConfiguration.class, ContainerProxyApplication.class})
40+
@ExtendWith(SpringExtension.class)
41+
@ActiveProfiles("test")
42+
@ContextConfiguration(initializers = PropertyOverrideContextInitializer.class)
43+
public class RetryingTest {
44+
45+
@Test
46+
public void testNumberOfAttempts() {
47+
Assertions.assertEquals(Retrying.numberOfAttempts(3_000), 11);
48+
Assertions.assertEquals(Retrying.numberOfAttempts(10_000), 15);
49+
Assertions.assertEquals(Retrying.numberOfAttempts(60_000), 40); // compared to 30 when not using the faster checks
50+
}
51+
52+
@Test
53+
public void testFastDelay() {
54+
Instant start = Instant.now();
55+
AtomicInteger called = new AtomicInteger(0);
56+
Retrying.retry((i, m) -> {
57+
called.incrementAndGet();
58+
return false;
59+
}, 3_000);
60+
long time = Duration.between(start, Instant.now()).toMillis();
61+
Assertions.assertTrue(time > 3_000);
62+
Assertions.assertTrue(time < 5_000);
63+
Assertions.assertEquals(11, called.get());
64+
}
65+
66+
@Test
67+
public void testSlowDelay() {
68+
Instant start = Instant.now();
69+
AtomicInteger called = new AtomicInteger(0);
70+
Retrying.retry((i, m) -> {
71+
called.incrementAndGet();
72+
return false;
73+
}, 10_000);
74+
long time = Duration.between(start, Instant.now()).toMillis();
75+
Assertions.assertTrue(time > 10_000);
76+
Assertions.assertTrue(time < 12_000);
77+
Assertions.assertEquals(15, called.get());
78+
}
79+
80+
}

0 commit comments

Comments
 (0)