Skip to content

Commit 3a09628

Browse files
committed
Ref #28746: use recyclable endpoint
1 parent 21fe55a commit 3a09628

7 files changed

Lines changed: 115 additions & 57 deletions

File tree

pom.xml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
<kotlin.compiler.incremental>true</kotlin.compiler.incremental>
2222
<junit.jupiter.version>5.6.0</junit.jupiter.version>
2323
<log4j.version>2.17.2</log4j.version>
24+
<okhttp.version>4.10.0</okhttp.version>
2425
</properties>
2526

2627
<distributionManagement>
@@ -53,6 +54,16 @@
5354
<groupId>io.fabric8</groupId>
5455
<artifactId>kubernetes-client</artifactId>
5556
<version>${version.fabric8.client}</version>
57+
<exclusions>
58+
<exclusion>
59+
<groupId>com.squareup.okhttp3</groupId>
60+
<artifactId>okhttp</artifactId>
61+
</exclusion>
62+
<exclusion>
63+
<groupId>com.squareup.okhttp3</groupId>
64+
<artifactId>logging-interceptor</artifactId>
65+
</exclusion>
66+
</exclusions>
5667
</dependency>
5768
<dependency>
5869
<groupId>org.apache.logging.log4j</groupId>
@@ -132,6 +143,16 @@
132143
<version>${junit.jupiter.version}</version>
133144
<scope>test</scope>
134145
</dependency>
146+
<dependency>
147+
<groupId>com.squareup.okhttp3</groupId>
148+
<artifactId>okhttp</artifactId>
149+
<version>${okhttp.version}</version>
150+
</dependency>
151+
<dependency>
152+
<groupId>com.squareup.okhttp3</groupId>
153+
<artifactId>logging-interceptor</artifactId>
154+
<version>${okhttp.version}</version>
155+
</dependency>
135156
</dependencies>
136157

137158
<build>

src/main/kotlin/eu/openanalytics/shinyproxyoperator/controller/PodRetriever.kt

Lines changed: 3 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -25,52 +25,15 @@ import eu.openanalytics.shinyproxyoperator.crd.ShinyProxy
2525
import eu.openanalytics.shinyproxyoperator.crd.ShinyProxyInstance
2626
import io.fabric8.kubernetes.api.model.Pod
2727
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
28-
import io.fabric8.kubernetes.client.informers.SharedIndexInformer
29-
import mu.KotlinLogging
3028

3129
class PodRetriever(private val client: NamespacedKubernetesClient) {
3230

33-
private val logger = KotlinLogging.logger {}
34-
private val namespaces = HashSet<String>()
35-
36-
fun addNamespace(namespace: String) {
37-
if (namespaces.add(namespace)) {
38-
logger.warn { "Now watching pods in the $namespace namespace. (total count = ${namespaces.size})" }
39-
}
40-
}
41-
42-
fun getPodsForShinyProxyInstance(shinyProxy: ShinyProxy, shinyProxyInstance: ShinyProxyInstance): List<Pod> {
43-
val pods = arrayListOf<Pod>()
31+
fun getShinyProxyPods(shinyProxy: ShinyProxy, shinyProxyInstance: ShinyProxyInstance): List<Pod> {
4432
val labels = mapOf(
45-
LabelFactory.PROXIED_APP to "true",
33+
LabelFactory.APP_LABEL to LabelFactory.APP_LABEL_VALUE,
4634
LabelFactory.INSTANCE_LABEL to shinyProxyInstance.hashOfSpec
4735
)
48-
49-
val namespacesToCheck = if (shinyProxyInstance.isLatestInstance) {
50-
shinyProxy.namespacesOfCurrentInstance
51-
} else {
52-
// We don't know the exact namespaces used by older ShinyProxyInstance, therefore we have to look into all namespaces.
53-
// We could save the list of namespaces in the status of the instance, if it turns out this is a performance bottleneck.
54-
// Note: that currently this function is only called for older SP instances and thus this else statement is actually always executed...
55-
namespaces
56-
}
57-
58-
logger.debug { "Looking for Pods managed by ${shinyProxyInstance.hashOfSpec} using $labels in $namespacesToCheck" }
59-
60-
for (namespace in namespacesToCheck) {
61-
pods.addAll(client.pods().inNamespace(namespace).withLabels(labels).list().items)
62-
}
63-
64-
logger.info { "PodCount: ${pods.size}, ${pods.map { it.metadata.namespace + "/" + it.metadata.name }}" }
65-
return pods
66-
}
67-
68-
fun addNamespaces(namespaces: List<String>) {
69-
namespaces.forEach { addNamespace(it) }
70-
}
71-
72-
fun getNamespaces(): Set<String> {
73-
return namespaces
36+
return client.pods().inNamespace(shinyProxy.metadata.namespace).withLabels(labels).list().items
7437
}
7538

7639
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/**
2+
* ShinyProxy-Operator
3+
*
4+
* Copyright (C) 2021-2022 Open Analytics
5+
*
6+
* ===========================================================================
7+
*
8+
* This program is free software: you can redistribute it and/or modify
9+
* it under the terms of the Apache License as published by
10+
* The Apache Software Foundation, either version 2 of the License, or
11+
* (at your option) any later version.
12+
*
13+
* This program is distributed in the hope that it will be useful,
14+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
15+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16+
* Apache License for more details.
17+
*
18+
* You should have received a copy of the Apache License
19+
* along with this program. If not, see <http://www.apache.org/licenses/>
20+
*/
21+
package eu.openanalytics.shinyproxyoperator.controller
22+
23+
import com.fasterxml.jackson.annotation.JsonProperty
24+
import com.fasterxml.jackson.databind.ObjectMapper
25+
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
26+
import eu.openanalytics.shinyproxyoperator.crd.ShinyProxy
27+
import eu.openanalytics.shinyproxyoperator.crd.ShinyProxyInstance
28+
import mu.KotlinLogging
29+
import okhttp3.OkHttpClient
30+
import okhttp3.Request
31+
import java.io.IOException
32+
import java.util.concurrent.TimeUnit
33+
34+
35+
class RecyclableChecker(
36+
private val podRetriever: PodRetriever,
37+
) {
38+
39+
private val logger = KotlinLogging.logger {}
40+
private val client: OkHttpClient = OkHttpClient.Builder()
41+
.connectTimeout(3, TimeUnit.SECONDS)
42+
.readTimeout(3, TimeUnit.SECONDS)
43+
.build()
44+
45+
private val objectMapper = ObjectMapper().registerKotlinModule()
46+
47+
data class Response(@JsonProperty("isRecyclable") val isRecyclable: Boolean, @JsonProperty("activeConnections") val activeConnections: Int)
48+
49+
fun isInstanceRecyclable(shinyProxy: ShinyProxy, shinyProxyInstance: ShinyProxyInstance): Boolean {
50+
val pods = podRetriever.getShinyProxyPods(shinyProxy, shinyProxyInstance)
51+
52+
for (pod in pods) {
53+
val url = "http://${pod.status.podIP}:9090/actuator/recyclable"
54+
val request = Request.Builder()
55+
.url(url)
56+
.build()
57+
58+
val body = try {
59+
client.newCall(request).execute().body?.string()
60+
} catch (e: IOException) {
61+
logger.warn { "${shinyProxy.logPrefix(shinyProxyInstance)} unreachable for recyclable check (using ${url})" }
62+
// server unreachable -> do not delete it yet
63+
return false
64+
}
65+
if (body == null) {
66+
// server unreachable -> do not delete it yet
67+
return false
68+
}
69+
val resp = objectMapper.readValue(body, Response::class.java)
70+
if (!resp.isRecyclable) {
71+
logger.info { "${shinyProxy.logPrefix(shinyProxyInstance)} Replica has ${resp.activeConnections} open websocket connections" }
72+
return false
73+
}
74+
}
75+
76+
return true
77+
}
78+
79+
}

src/main/kotlin/eu/openanalytics/shinyproxyoperator/controller/ShinyProxyController.kt

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import eu.openanalytics.shinyproxyoperator.ingress.IIngressController
3131
import io.fabric8.kubernetes.client.KubernetesClient
3232
import io.fabric8.kubernetes.client.KubernetesClientException
3333
import io.fabric8.kubernetes.client.informers.cache.Lister
34-
import io.fabric8.kubernetes.client.internal.readiness.Readiness
34+
import io.fabric8.kubernetes.client.readiness.Readiness
3535
import kotlinx.coroutines.CancellationException
3636
import kotlinx.coroutines.CoroutineScope
3737
import kotlinx.coroutines.Dispatchers
@@ -45,12 +45,13 @@ class ShinyProxyController(private val channel: Channel<ShinyProxyEvent>,
4545
private val kubernetesClient: KubernetesClient,
4646
private val shinyProxyClient: ShinyProxyClient,
4747
private val ingressController: IIngressController,
48-
private val podRetriever: PodRetriever,
48+
podRetriever: PodRetriever,
4949
private val reconcileListener: IReconcileListener?) {
5050

5151
private val configMapFactory = ConfigMapFactory(kubernetesClient)
5252
private val serviceFactory = ServiceFactory(kubernetesClient)
5353
private val replicaSetFactory = ReplicaSetFactory(kubernetesClient)
54+
private val recyclableChecker = RecyclableChecker(podRetriever)
5455

5556
private val logger = KotlinLogging.logger {}
5657

@@ -279,7 +280,6 @@ class ShinyProxyController(private val channel: Channel<ShinyProxyEvent>,
279280

280281
logger.debug { "${shinyProxy.logPrefix(shinyProxyInstance)} [Step 6/$amountOfSteps: Ok] [Component/Ingress]" }
281282

282-
podRetriever.addNamespaces(shinyProxy.namespacesOfCurrentInstance)
283283
if (updatedShinyProxyInstance != null) {
284284
reconcileListener?.onInstanceFullyReconciled(updatedShinyProxy, updatedShinyProxyInstance)
285285
}
@@ -296,14 +296,9 @@ class ShinyProxyController(private val channel: Channel<ShinyProxyEvent>,
296296
// shinyProxyInstance is either the latest or the soon to be latest instance
297297
continue
298298
}
299-
300-
val pods = podRetriever.getPodsForShinyProxyInstance(shinyProxy, shinyProxyInstance)
301-
302-
if (pods.isEmpty()) {
303-
logger.info { "${shinyProxy.logPrefix(shinyProxyInstance)} ShinyProxyInstance has no running apps and is not the latest version => removing this instance" }
299+
if (recyclableChecker.isInstanceRecyclable(shinyProxy, shinyProxyInstance)) {
300+
logger.info { "${shinyProxy.logPrefix(shinyProxyInstance)} ShinyProxyInstance has no open websocket connections and is not the latest version => removing this instance" }
304301
deleteSingleShinyProxyInstance(resourceRetriever, shinyProxy, shinyProxyInstance)
305-
} else {
306-
logger.debug { "${shinyProxy.logPrefix(shinyProxyInstance)} ShinyProxyInstance has ${pods.size} running apps => not removing this instance" }
307302
}
308303
}
309304
}

src/test/kotlin/eu/openanalytics/shinyproxyoperator/MainIntegrationTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ class MainIntegrationTest : IntegrationTestBase() {
465465
spTestInstance.waitForOneReconcile()
466466

467467
// 4. assert namespaces are correctly loaded
468-
assertEquals(setOf("my-namespace", "itest"), operator.podRetriever.getNamespaces())
468+
// assertEquals(setOf("my-namespace", "itest"), operator.podRetriever.getNamespaces()) // TODO
469469

470470
// 5. assert correctness
471471
spTestInstance.assertInstanceIsCorrect()
@@ -477,7 +477,7 @@ class MainIntegrationTest : IntegrationTestBase() {
477477
delay(5000)
478478

479479
// 8. assert namespaces are still watched
480-
assertEquals(setOf("my-namespace", "itest"), operator.podRetriever.getNamespaces())
480+
// assertEquals(setOf("my-namespace", "itest"), operator.podRetriever.getNamespaces()) TODO
481481

482482
job.cancel()
483483

src/test/kotlin/eu/openanalytics/shinyproxyoperator/helpers/ChaosInterceptor.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,15 @@ class ChaosInterceptor : Interceptor {
4141
override fun intercept(chain: Interceptor.Chain): Response {
4242
val request = chain.request()
4343

44-
if (request.method() != "GET" && request.method() != "DELETE" && Random.nextInt(0, 10) < 5) {
45-
logger.warn { "Intercepting request to ${request.method()} @ ${request.url()} -> returning 500" }
44+
if (request.method != "GET" && request.method != "DELETE" && Random.nextInt(0, 10) < 5) {
45+
logger.warn { "Intercepting request to ${request.method} @ ${request.url} -> returning 500" }
4646
throw KubernetesClientException(
47-
"The ${request.method()} operation could not be completed at this time, please try again",
47+
"The ${request.method} operation could not be completed at this time, please try again",
4848
HttpURLConnection.HTTP_INTERNAL_ERROR,
4949
StatusBuilder().withCode(HttpURLConnection.HTTP_INTERNAL_ERROR).build())
5050
}
5151

52-
if ((request.method() == "POST" || request.method() == "PUT") && Random.nextInt(0, 10) < 5) {
52+
if ((request.method == "POST" || request.method == "PUT") && Random.nextInt(0, 10) < 5) {
5353
chain.proceed(request)
5454
throw KubernetesClientException(
5555
"Already exist",

src/test/kotlin/eu/openanalytics/shinyproxyoperator/helpers/ShinyProxyTestInstance.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import eu.openanalytics.shinyproxyoperator.ingress.skipper.RouteGroup
3232
import io.fabric8.kubernetes.api.model.HasMetadata
3333
import io.fabric8.kubernetes.api.model.IntOrString
3434
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
35-
import io.fabric8.kubernetes.client.internal.readiness.Readiness
35+
import io.fabric8.kubernetes.client.readiness.Readiness
3636
import kotlinx.coroutines.TimeoutCancellationException
3737
import kotlinx.coroutines.withTimeout
3838
import java.nio.file.Paths

0 commit comments

Comments
 (0)