@@ -28,33 +28,28 @@ import eu.openanalytics.shinyproxyoperator.controller.ResourceRetriever
2828import eu.openanalytics.shinyproxyoperator.controller.ShinyProxyController
2929import eu.openanalytics.shinyproxyoperator.controller.ShinyProxyEvent
3030import eu.openanalytics.shinyproxyoperator.controller.ShinyProxyListener
31- import eu.openanalytics.shinyproxyoperator.crd.DoneableShinyProxy
3231import eu.openanalytics.shinyproxyoperator.crd.ShinyProxy
33- import eu.openanalytics.shinyproxyoperator.crd.ShinyProxyList
3432import eu.openanalytics.shinyproxyoperator.ingress.skipper.IngressController
3533import io.fabric8.kubernetes.api.model.ConfigMap
3634import io.fabric8.kubernetes.api.model.ConfigMapList
3735import io.fabric8.kubernetes.api.model.Service
3836import io.fabric8.kubernetes.api.model.ServiceList
3937import io.fabric8.kubernetes.api.model.apps.ReplicaSet
4038import io.fabric8.kubernetes.api.model.apps.ReplicaSetList
41- import io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress
42- import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressList
4339import io.fabric8.kubernetes.client.DefaultKubernetesClient
4440import io.fabric8.kubernetes.client.KubernetesClientException
4541import io.fabric8.kubernetes.client.NamespacedKubernetesClient
46- import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext
47- import io.fabric8.kubernetes.client.dsl.base.OperationContext
48- import io.fabric8.kubernetes.client.informers.SharedIndexInformer
49- import io.fabric8.kubernetes.client.informers.SharedInformerFactory
42+ import io.fabric8.kubernetes.client.dsl.Resource
43+ import io.fabric8.kubernetes.client.dsl.RollableScalableResource
44+ import io.fabric8.kubernetes.client.dsl.ServiceResource
5045import io.fabric8.kubernetes.client.informers.cache.Lister
5146import io.fabric8.kubernetes.client.utils.Serialization
5247import kotlinx.coroutines.channels.Channel
5348import kotlinx.coroutines.channels.SendChannel
5449import mu.KotlinLogging
50+ import org.apache.logging.log4j.Level
5551import org.apache.logging.log4j.core.config.Configurator
5652import java.util.*
57- import org.apache.logging.log4j.Level
5853import kotlin.concurrent.schedule
5954import kotlin.system.exitProcess
6055
@@ -80,23 +75,20 @@ class Operator(client: NamespacedKubernetesClient? = null,
8075 val startupProbeInitialDelay: Int
8176 val processMaxLifetime: Long
8277
83- private val podSetCustomResourceDefinitionContext = CustomResourceDefinitionContext .Builder ()
84- .withVersion(" v1alpha1" )
85- .withScope(" Namespaced" )
86- .withGroup(" openanalytics.eu" )
87- .withPlural(" shinyproxies" )
88- .build()
89-
90- private val informerFactory: SharedInformerFactory
91- private val replicaSetInformer: SharedIndexInformer <ReplicaSet >
92- private val serviceInformer: SharedIndexInformer <Service >
93- private val configMapInformer: SharedIndexInformer <ConfigMap >
94- private val ingressInformer: SharedIndexInformer <Ingress >
95- private val shinyProxyInformer: SharedIndexInformer <ShinyProxy >
9678 val podRetriever: PodRetriever
79+ private val shinyProxyClient: ShinyProxyClient
80+
81+ private val shinyProxyListener: ShinyProxyListener
82+ private val replicaSetListener: ResourceListener <ReplicaSet , ReplicaSetList , RollableScalableResource <ReplicaSet >>
83+ private val serviceListener: ResourceListener <Service , ServiceList , ServiceResource <Service >>
84+ private val configMapListener: ResourceListener <ConfigMap , ConfigMapList , Resource <ConfigMap >>
85+ private val ingressController: IngressController
86+
87+ private val channel = Channel <ShinyProxyEvent >(10000 )
88+ val sendChannel: SendChannel <ShinyProxyEvent > = channel // public for tests
9789
9890 /* *
99- * Initialize mode, client, namespace and informers
91+ * Initialize mode, client, namespace and listeners
10092 */
10193 init {
10294 Serialization .jsonMapper().registerKotlinModule()
@@ -123,13 +115,13 @@ class Operator(client: NamespacedKubernetesClient? = null,
123115 if (this .processMaxLifetime != - 1L ) {
124116 Timer ().schedule(this .processMaxLifetime * 60 * 1000 ) {
125117 logger.warn { " Max lifetime of process reached, preparing shutdown" }
126- sendChannel.close();
118+ sendChannel.close()
127119 while (! channel.isClosedForReceive && ! channel.isEmpty && ! shinyProxyController.idle) {
128120 logger.warn { " Still processing events in queue, delaying shutdown" }
129121 Thread .sleep(250 )
130122 }
131123 logger.warn { " Queue is empty, exiting process" }
132- exitProcess(1 );
124+ exitProcess(1 )
133125 }
134126 }
135127
@@ -146,79 +138,42 @@ class Operator(client: NamespacedKubernetesClient? = null,
146138 }
147139 logger.info { " Using namespace : $namespace " }
148140
149- informerFactory = when (this .mode) {
150- Mode .CLUSTERED -> this .client.inAnyNamespace().informers( )
151- Mode .NAMESPACED -> this .client.inNamespace(namespace).informers( )
141+ this .shinyProxyClient = when (this .mode) {
142+ Mode .CLUSTERED -> this .client.inAnyNamespace().resources( ShinyProxy :: class .java )
143+ Mode .NAMESPACED -> this .client.inNamespace(namespace).resources( ShinyProxy :: class .java )
152144 }
153145
146+ shinyProxyListener = ShinyProxyListener (sendChannel, this .shinyProxyClient)
147+ podRetriever = PodRetriever (this .client)
148+
154149 if (this .mode == Mode .CLUSTERED ) {
155- replicaSetInformer = informerFactory.sharedIndexInformerFor(ReplicaSet ::class .java, ReplicaSetList ::class .java, 10 * 60 * 1000 .toLong())
156- serviceInformer = informerFactory.sharedIndexInformerFor(Service ::class .java, ServiceList ::class .java, 10 * 60 * 1000 .toLong())
157- configMapInformer = informerFactory.sharedIndexInformerFor(ConfigMap ::class .java, ConfigMapList ::class .java, 10 * 60 * 1000 .toLong())
158- ingressInformer = informerFactory.sharedIndexInformerFor(Ingress ::class .java, IngressList ::class .java, 10 * 60 * 1000 .toLong())
159- shinyProxyInformer = informerFactory.sharedIndexInformerForCustomResource(podSetCustomResourceDefinitionContext, ShinyProxy ::class .java, ShinyProxyList ::class .java, 10 * 60 * 1000 .toLong())
160- podRetriever = PodRetriever (this .client)
150+ replicaSetListener = ResourceListener (sendChannel, this .client.inAnyNamespace().apps().replicaSets())
151+ serviceListener = ResourceListener (sendChannel, this .client.inAnyNamespace().services())
152+ configMapListener = ResourceListener (sendChannel, this .client.inAnyNamespace().configMaps())
153+ ingressController = IngressController (sendChannel, this .client, this .client.inAnyNamespace().network().ingress())
161154 } else {
162- val operationContext = OperationContext ().withNamespace(namespace)
163- replicaSetInformer = informerFactory.sharedIndexInformerFor(ReplicaSet ::class .java, ReplicaSetList ::class .java, operationContext, 10 * 60 * 1000 .toLong())
164- serviceInformer = informerFactory.sharedIndexInformerFor(Service ::class .java, ServiceList ::class .java, operationContext, 10 * 60 * 1000 .toLong())
165- configMapInformer = informerFactory.sharedIndexInformerFor(ConfigMap ::class .java, ConfigMapList ::class .java, operationContext, 10 * 60 * 1000 .toLong())
166- ingressInformer = informerFactory.sharedIndexInformerFor(Ingress ::class .java, IngressList ::class .java, operationContext, 10 * 60 * 1000 .toLong())
167- shinyProxyInformer = informerFactory.sharedIndexInformerForCustomResource(podSetCustomResourceDefinitionContext, ShinyProxy ::class .java, ShinyProxyList ::class .java, operationContext, 10 * 60 * 1000 .toLong())
168- podRetriever = PodRetriever (this .client)
155+ replicaSetListener = ResourceListener (sendChannel, this .client.inNamespace(namespace).apps().replicaSets())
156+ serviceListener = ResourceListener (sendChannel, this .client.inNamespace(namespace).services())
157+ configMapListener = ResourceListener (sendChannel, this .client.inNamespace(namespace).configMaps())
158+ ingressController = IngressController (sendChannel, this .client, this .client.inNamespace(namespace).network().ingress())
169159 }
170160
171161 Timer ().schedule(5000 , 5000 ) {
172162 val num = (getOperatorInstance().client as DefaultKubernetesClient ).httpClient.connectionPool().connectionCount()
173163 val max = (getOperatorInstance().client as DefaultKubernetesClient ).configuration.maxConcurrentRequests
174- logger.warn { " Current number of connections: $num of $max " }
164+ logger.warn { " Current number of connections: $num of $max " }
175165 }
176166 }
177167
178- /* *
179- * Main Components
180- */
181- private val shinyProxyClient = when (this .mode) {
182- Mode .CLUSTERED -> this .client.customResources(podSetCustomResourceDefinitionContext, ShinyProxy ::class .java, ShinyProxyList ::class .java, DoneableShinyProxy ::class .java)
183- Mode .NAMESPACED -> this .client.inNamespace(namespace).customResources(podSetCustomResourceDefinitionContext, ShinyProxy ::class .java, ShinyProxyList ::class .java, DoneableShinyProxy ::class .java)
184- }
185- private val channel = Channel <ShinyProxyEvent >(10000 )
186- val sendChannel: SendChannel <ShinyProxyEvent > = channel // public for tests
187-
188- /* *
189- * Listers
190- */
191- private val shinyProxyLister = Lister (shinyProxyInformer.indexer)
192- private val replicaSetLister = Lister (replicaSetInformer.indexer)
193- private val configMapLister = Lister (configMapInformer.indexer)
194- private val serviceLister = Lister (serviceInformer.indexer)
195- private val ingressLister = Lister (ingressInformer.indexer)
196-
197- /* *
198- * Listeners
199- * Note: it is normal that these are unused, since they only perform background processing
200- */
201- private val shinyProxyListener = ShinyProxyListener (sendChannel, shinyProxyInformer, shinyProxyLister)
202- private val replicaSetListener = ResourceListener (sendChannel, replicaSetInformer, shinyProxyLister)
203- private val serviceListener = ResourceListener (sendChannel, serviceInformer, shinyProxyLister)
204- private val configMapListener = ResourceListener (sendChannel, configMapInformer, shinyProxyLister)
205-
206- /* *
207- * Helpers
208- */
209- private val resourceRetriever = ResourceRetriever (replicaSetLister, configMapLister, serviceLister, ingressLister)
210-
211168 /* *
212169 * Controllers
213170 */
214- private val ingressController = IngressController (channel, ingressInformer, shinyProxyLister, this .client, resourceRetriever)
215- val shinyProxyController = ShinyProxyController (channel, this .client, shinyProxyClient, replicaSetInformer, shinyProxyInformer, ingressController, resourceRetriever, shinyProxyLister, podRetriever, reconcileListener)
171+ val shinyProxyController = ShinyProxyController (channel, this .client, shinyProxyClient, ingressController, podRetriever, reconcileListener)
216172
217-
218- fun prepare () {
219- logger.info(" Starting ShinyProxy Operator" )
173+ fun prepare (): Pair <ResourceRetriever , Lister <ShinyProxy >> {
174+ logger.info { " Starting background processes of ShinyProxy Operator" }
220175 try {
221- if (client.customResourceDefinitions().withName(" shinyproxies.openanalytics.eu" ).get() == null ) {
176+ if (client.apiextensions().v1beta1(). customResourceDefinitions().withName(" shinyproxies.openanalytics.eu" ).get() == null ) {
222177 println ()
223178 println ()
224179 println (" ERROR: the CustomResourceDefinition (CRD) of the Operator does not exist!" )
@@ -240,22 +195,19 @@ class Operator(client: NamespacedKubernetesClient? = null,
240195 println ()
241196 }
242197
243- informerFactory.startAllRegisteredInformers()
244-
245- informerFactory.addSharedInformerEventListener {
246- // TODO exit when KubernetesClientException ?
247- logger.warn(it) { " Exception occurred, but caught $it " }
248- }
249-
250- while (! replicaSetInformer.hasSynced() || ! shinyProxyInformer.hasSynced()) {
251- // Wait till Informer syncs
252- }
198+ val shinyProxyLister = Lister (shinyProxyListener.start())
199+ val replicaSetLister = Lister (replicaSetListener.start(shinyProxyLister))
200+ val serviceLister = Lister (serviceListener.start(shinyProxyLister))
201+ val configMapLister = Lister (configMapListener.start(shinyProxyLister))
202+ val ingressLister = Lister (ingressController.start(shinyProxyLister))
203+ val resourceRetriever = ResourceRetriever (replicaSetLister, configMapLister, serviceLister, ingressLister)
253204
205+ return resourceRetriever to shinyProxyLister
254206 }
255207
256- suspend fun run () {
257- prepare()
258- shinyProxyController.run ()
208+ suspend fun run (resourceRetriever : ResourceRetriever , shinyProxyLister : Lister < ShinyProxy > ) {
209+ logger.info { " Starting ShinyProxy Operator " }
210+ shinyProxyController.run (resourceRetriever, shinyProxyLister )
259211 }
260212
261213 companion object {
0 commit comments