4444import io .fabric8 .kubernetes .api .model .ContainerPortBuilder ;
4545import io .fabric8 .kubernetes .api .model .EnvVar ;
4646import io .fabric8 .kubernetes .api .model .EnvVarSourceBuilder ;
47+ import io .fabric8 .kubernetes .api .model .GenericKubernetesResource ;
48+ import io .fabric8 .kubernetes .api .model .GenericKubernetesResourceList ;
4749import io .fabric8 .kubernetes .api .model .HasMetadata ;
4850import io .fabric8 .kubernetes .api .model .LocalObjectReference ;
4951import io .fabric8 .kubernetes .api .model .ObjectMetaBuilder ;
5052import io .fabric8 .kubernetes .api .model .Pod ;
5153import io .fabric8 .kubernetes .api .model .PodBuilder ;
54+ import io .fabric8 .kubernetes .api .model .ServiceBuilder ;
5255import io .fabric8 .kubernetes .api .model .PodSpec ;
5356import io .fabric8 .kubernetes .api .model .Quantity ;
5457import io .fabric8 .kubernetes .api .model .ResourceRequirementsBuilder ;
6669import io .fabric8 .kubernetes .client .DefaultKubernetesClient ;
6770import io .fabric8 .kubernetes .client .KubernetesClient ;
6871import io .fabric8 .kubernetes .client .dsl .LogWatch ;
72+ import io .fabric8 .kubernetes .client .dsl .MixedOperation ;
73+ import io .fabric8 .kubernetes .client .dsl .Resource ;
74+ import io .fabric8 .kubernetes .client .dsl .base .PatchContext ;
75+ import io .fabric8 .kubernetes .client .dsl .base .PatchType ;
6976import io .fabric8 .kubernetes .client .internal .readiness .Readiness ;
7077import io .fabric8 .kubernetes .client .utils .Serialization ;
7178import org .apache .commons .io .IOUtils ;
@@ -112,6 +119,8 @@ public class KubernetesBackend extends AbstractContainerBackend {
112119
113120 private static final String SECRET_KEY_REF = "secretKeyRef" ;
114121
122+ private static final String ANNOTATION_MANIFEST_POLICY = "openanalytics.eu/sp-additional-manifest-policy" ;
123+
115124 @ Inject
116125 private PodPatcher podPatcher ;
117126
@@ -279,14 +288,14 @@ protected Container startContainer(ContainerSpec spec, Proxy proxy) throws Excep
279288 int totalWaitMs = Integer .parseInt (environment .getProperty ("proxy.kubernetes.pod-wait-time" , "60000" ));
280289 int maxTries = totalWaitMs / 1000 ;
281290 Retrying .retry (i -> {
282- if (!Readiness .isReady (kubeClient .resource (startedPod ).fromServer ().get ())) {
291+ if (!Readiness .getInstance (). isReady (kubeClient .resource (startedPod ).fromServer ().get ())) {
283292 if (i > 1 && log != null ) log .debug (String .format ("Container not ready yet, trying again (%d/%d)" , i , maxTries ));
284293 return false ;
285294 }
286295 return true ;
287296 }
288297 , maxTries , 1000 );
289- if (!Readiness .isReady (kubeClient .resource (startedPod ).fromServer ().get ())) {
298+ if (!Readiness .getInstance (). isReady (kubeClient .resource (startedPod ).fromServer ().get ())) {
290299 Pod pod = kubeClient .resource (startedPod ).fromServer ().get ();
291300 container .getParameters ().put (PARAM_POD , pod );
292301 proxy .getContainers ().add (container );
@@ -302,19 +311,20 @@ protected Container startContainer(ContainerSpec spec, Proxy proxy) throws Excep
302311 .map (p -> new ServicePortBuilder ().withPort (p ).build ())
303312 .collect (Collectors .toList ());
304313
305- Service startupService = kubeClient .services ().inNamespace (effectiveKubeNamespace ).createNew ()
306- .withApiVersion (apiVersion )
307- .withKind ("Service" )
308- .withNewMetadata ()
309- .withName ("sp-service-" + container .getId ())
310- .withLabels (serviceLabels )
311- .endMetadata ()
312- .withNewSpec ()
313- .addToSelector ("app" , container .getId ())
314- .withType ("NodePort" )
315- .withPorts (servicePorts )
316- .endSpec ()
317- .done ();
314+ Service startupService = kubeClient .services ().inNamespace (effectiveKubeNamespace )
315+ .create (new ServiceBuilder ()
316+ .withApiVersion (apiVersion )
317+ .withKind ("Service" )
318+ .withNewMetadata ()
319+ .withName ("sp-service-" + container .getId ())
320+ .withLabels (serviceLabels )
321+ .endMetadata ()
322+ .withNewSpec ()
323+ .addToSelector ("app" , container .getId ())
324+ .withType ("NodePort" )
325+ .withPorts (servicePorts )
326+ .endSpec ()
327+ .build ());
318328
319329 // Workaround: waitUntilReady appears to be buggy.
320330 Retrying .retry (i -> isServiceReady (kubeClient .resource (startupService ).fromServer ().get ()), 60 , 1000 );
@@ -367,45 +377,80 @@ private JsonPatch readPatchFromSpec(ContainerSpec containerSpec, Proxy proxy) th
367377 *
368378 * The resource will only be created if it does not already exist.
369379 */
370- private void createAdditionalManifests (Proxy proxy , String namespace ) {
371- for (HasMetadata fullObject : getAdditionManifestsAsObjects (proxy , namespace )) {
372- if (kubeClient .resource (fullObject ).fromServer ().get () == null ) {
373- kubeClient .resource (fullObject ).createOrReplace ();
374- }
380+ private void createAdditionalManifests (Proxy proxy , String namespace ) throws JsonProcessingException {
381+ for (GenericKubernetesResource fullObject : getAdditionManifestsAsObjects (proxy , namespace )) {
382+ applyAdditionalManifest (fullObject );
375383 }
376- for (HasMetadata fullObject : getAdditionPersistentManifestsAsObjects (proxy , namespace )) {
377- if (kubeClient .resource (fullObject ).fromServer ().get () == null ) {
378- kubeClient .resource (fullObject ).createOrReplace ();
379- }
384+ for (GenericKubernetesResource fullObject : getAdditionPersistentManifestsAsObjects (proxy , namespace )) {
385+ applyAdditionalManifest (fullObject );
380386 }
381387 }
382388
383- private List <HasMetadata > getAdditionManifestsAsObjects (Proxy proxy , String namespace ) {
389+ private List <GenericKubernetesResource > getAdditionManifestsAsObjects (Proxy proxy , String namespace ) throws JsonProcessingException {
384390 SpecExpressionContext context = SpecExpressionContext .create (
385391 proxy , proxy .getSpec ());
386- return parseAdditionalManifests (context , proxy , namespace , proxy .getSpec ().getKubernetesAdditionalManifests ());
392+ return parseAdditionalManifests (context , namespace , proxy .getSpec ().getKubernetesAdditionalManifests ());
387393 }
388394
389- private List <HasMetadata > getAdditionPersistentManifestsAsObjects (Proxy proxy , String namespace ) {
395+ private List <GenericKubernetesResource > getAdditionPersistentManifestsAsObjects (Proxy proxy , String namespace ) throws JsonProcessingException {
390396 SpecExpressionContext context = SpecExpressionContext .create (
391397 proxy , proxy .getSpec (),
392398 userService .getCurrentAuth ().getPrincipal (), userService .getCurrentAuth ().getCredentials ());
393- return parseAdditionalManifests (context , proxy , namespace , proxy .getSpec ().getKubernetesAdditionalPersistentManifests ());
399+ return parseAdditionalManifests (context , namespace , proxy .getSpec ().getKubernetesAdditionalPersistentManifests ());
400+ }
401+
402+ private void applyAdditionalManifest (GenericKubernetesResource resource ) {
403+ MixedOperation <GenericKubernetesResource , GenericKubernetesResourceList , Resource <GenericKubernetesResource >> client = kubeClient .genericKubernetesResources (resource .getApiVersion (), resource .getKind ());
404+ String policy ;
405+ if (resource .getMetadata ().getAnnotations () != null ) {
406+ policy = resource .getMetadata ().getAnnotations ().getOrDefault (ANNOTATION_MANIFEST_POLICY , "CreateOnce" );
407+ } else {
408+ policy = "CreateOnce" ;
409+ }
410+ log .info ("Applying persistent manifest (name: {}, policy: {})" , resource .getMetadata ().getName (), policy );
411+ if (policy .equalsIgnoreCase ("CreateOnce" )) {
412+ if (kubeClient .resource (resource ).fromServer ().get () == null ) {
413+ client .create (resource );
414+ }
415+ } else if (policy .equalsIgnoreCase ("Patch" )) {
416+ if (kubeClient .resource (resource ).fromServer ().get () == null ) {
417+ client .create (resource );
418+ } else {
419+ client .patch (PatchContext .of (PatchType .JSON_MERGE ), resource );
420+ }
421+ } else if (policy .equalsIgnoreCase ("Delete" )) {
422+ if (kubeClient .resource (resource ).fromServer ().get () != null ) {
423+ client .delete (resource );
424+ }
425+ } else if (policy .equalsIgnoreCase ("Replace" )) {
426+ if (kubeClient .resource (resource ).fromServer ().get () != null ) {
427+ client .delete (resource );
428+ log .info ("Waiting for deletion to finish" );
429+ // wait 60 seconds for deletion
430+ Retrying .retry ((idx ) -> kubeClient .resource (resource ).fromServer ().get () == null , 1000 , 60 );
431+ log .info ("Done" );
432+ }
433+ client .create (resource );
434+ } else {
435+ log .warn ("Unknown manifest-policy: {}" , policy );
436+ }
394437 }
395438
396439 /**
397440 * Converts the additional manifests of the spec into HasMetadata objects.
398441 * When the resource has no namespace definition, the provided namespace
399442 * parameter will be used.
400443 */
401- private List <HasMetadata > parseAdditionalManifests (SpecExpressionContext context , Proxy proxy , String namespace , List <String > manifests ) {
402-
403- ArrayList <HasMetadata > result = new ArrayList <>();
444+ private List <GenericKubernetesResource > parseAdditionalManifests (SpecExpressionContext context , String namespace , List <String > manifests ) throws JsonProcessingException {
445+ ArrayList <GenericKubernetesResource > result = new ArrayList <>();
404446 for (String manifest : manifests ) {
405447 String expressionManifest = expressionResolver .evaluateToString (manifest , context );
406- HasMetadata object = Serialization .unmarshal ( new ByteArrayInputStream (expressionManifest . getBytes ())); // used to determine whether the manifest has specified a namespace
448+ GenericKubernetesResource object = Serialization .yamlMapper (). readValue (expressionManifest , GenericKubernetesResource . class );
407449
408- HasMetadata fullObject = kubeClient .load (new ByteArrayInputStream (expressionManifest .getBytes ())).get ().get (0 );
450+ GenericKubernetesResource fullObject = kubeClient
451+ .genericKubernetesResources (object .getApiVersion (), object .getKind ())
452+ .load (new ByteArrayInputStream (expressionManifest .getBytes ())).get ();
453+
409454 if (object .getMetadata ().getNamespace () == null ) {
410455 // the load method (in some cases) automatically sets a namespace when no namespace is provided
411456 // therefore we overwrite this namespace with the namespace of the pod.
0 commit comments