4444import io .fabric8 .kubernetes .api .model .ContainerPortBuilder ;
4545import io .fabric8 .kubernetes .api .model .EnvVar ;
4646import io .fabric8 .kubernetes .api .model .EnvVarSourceBuilder ;
47+ import io .fabric8 .kubernetes .api .model .GenericKubernetesResource ;
48+ import io .fabric8 .kubernetes .api .model .GenericKubernetesResourceList ;
4749import io .fabric8 .kubernetes .api .model .HasMetadata ;
4850import io .fabric8 .kubernetes .api .model .LocalObjectReference ;
4951import io .fabric8 .kubernetes .api .model .ObjectMetaBuilder ;
5052import io .fabric8 .kubernetes .api .model .Pod ;
5153import io .fabric8 .kubernetes .api .model .PodBuilder ;
54+ import io .fabric8 .kubernetes .api .model .ServiceBuilder ;
5255import io .fabric8 .kubernetes .api .model .PodSpec ;
5356import io .fabric8 .kubernetes .api .model .Quantity ;
5457import io .fabric8 .kubernetes .api .model .ResourceRequirementsBuilder ;
6669import io .fabric8 .kubernetes .client .DefaultKubernetesClient ;
6770import io .fabric8 .kubernetes .client .KubernetesClient ;
6871import io .fabric8 .kubernetes .client .dsl .LogWatch ;
72+ import io .fabric8 .kubernetes .client .dsl .NonNamespaceOperation ;
73+ import io .fabric8 .kubernetes .client .dsl .Resource ;
74+ import io .fabric8 .kubernetes .client .dsl .base .PatchContext ;
75+ import io .fabric8 .kubernetes .client .dsl .base .PatchType ;
6976import io .fabric8 .kubernetes .client .internal .readiness .Readiness ;
7077import io .fabric8 .kubernetes .client .utils .Serialization ;
7178import org .apache .commons .io .IOUtils ;
@@ -112,6 +119,8 @@ public class KubernetesBackend extends AbstractContainerBackend {
112119
113120 private static final String SECRET_KEY_REF = "secretKeyRef" ;
114121
122+ private static final String ANNOTATION_MANIFEST_POLICY = "openanalytics.eu/sp-additional-manifest-policy" ;
123+
115124 @ Inject
116125 private PodPatcher podPatcher ;
117126
@@ -279,14 +288,14 @@ protected Container startContainer(ContainerSpec spec, Proxy proxy) throws Excep
279288 int totalWaitMs = Integer .parseInt (environment .getProperty ("proxy.kubernetes.pod-wait-time" , "60000" ));
280289 int maxTries = totalWaitMs / 1000 ;
281290 Retrying .retry (i -> {
282- if (!Readiness .isReady (kubeClient .resource (startedPod ).fromServer ().get ())) {
291+ if (!Readiness .getInstance (). isReady (kubeClient .resource (startedPod ).fromServer ().get ())) {
283292 if (i > 1 && log != null ) log .debug (String .format ("Container not ready yet, trying again (%d/%d)" , i , maxTries ));
284293 return false ;
285294 }
286295 return true ;
287296 }
288297 , maxTries , 1000 );
289- if (!Readiness .isReady (kubeClient .resource (startedPod ).fromServer ().get ())) {
298+ if (!Readiness .getInstance (). isReady (kubeClient .resource (startedPod ).fromServer ().get ())) {
290299 Pod pod = kubeClient .resource (startedPod ).fromServer ().get ();
291300 container .getParameters ().put (PARAM_POD , pod );
292301 proxy .getContainers ().add (container );
@@ -302,19 +311,20 @@ protected Container startContainer(ContainerSpec spec, Proxy proxy) throws Excep
302311 .map (p -> new ServicePortBuilder ().withPort (p ).build ())
303312 .collect (Collectors .toList ());
304313
305- Service startupService = kubeClient .services ().inNamespace (effectiveKubeNamespace ).createNew ()
306- .withApiVersion (apiVersion )
307- .withKind ("Service" )
308- .withNewMetadata ()
309- .withName ("sp-service-" + container .getId ())
310- .withLabels (serviceLabels )
311- .endMetadata ()
312- .withNewSpec ()
313- .addToSelector ("app" , container .getId ())
314- .withType ("NodePort" )
315- .withPorts (servicePorts )
316- .endSpec ()
317- .done ();
314+ Service startupService = kubeClient .services ().inNamespace (effectiveKubeNamespace )
315+ .create (new ServiceBuilder ()
316+ .withApiVersion (apiVersion )
317+ .withKind ("Service" )
318+ .withNewMetadata ()
319+ .withName ("sp-service-" + container .getId ())
320+ .withLabels (serviceLabels )
321+ .endMetadata ()
322+ .withNewSpec ()
323+ .addToSelector ("app" , container .getId ())
324+ .withType ("NodePort" )
325+ .withPorts (servicePorts )
326+ .endSpec ()
327+ .build ());
318328
319329 // Workaround: waitUntilReady appears to be buggy.
320330 Retrying .retry (i -> isServiceReady (kubeClient .resource (startupService ).fromServer ().get ()), 60 , 1000 );
@@ -367,45 +377,76 @@ private JsonPatch readPatchFromSpec(ContainerSpec containerSpec, Proxy proxy) th
367377 *
368378 * The resource will only be created if it does not already exist.
369379 */
370- private void createAdditionalManifests (Proxy proxy , String namespace ) {
371- for (HasMetadata fullObject : getAdditionManifestsAsObjects (proxy , namespace )) {
372- if (kubeClient .resource (fullObject ).fromServer ().get () == null ) {
373- kubeClient .resource (fullObject ).createOrReplace ();
374- }
380+ private void createAdditionalManifests (Proxy proxy , String namespace ) throws JsonProcessingException {
381+ for (GenericKubernetesResource fullObject : getAdditionManifestsAsObjects (proxy , namespace )) {
382+ applyAdditionalManifest (fullObject );
375383 }
376- for (HasMetadata fullObject : getAdditionPersistentManifestsAsObjects (proxy , namespace )) {
377- if (kubeClient .resource (fullObject ).fromServer ().get () == null ) {
378- kubeClient .resource (fullObject ).createOrReplace ();
379- }
384+ for (GenericKubernetesResource fullObject : getAdditionPersistentManifestsAsObjects (proxy , namespace )) {
385+ applyAdditionalManifest (fullObject );
380386 }
381387 }
382388
383- private List <HasMetadata > getAdditionManifestsAsObjects (Proxy proxy , String namespace ) {
389+ private List <GenericKubernetesResource > getAdditionManifestsAsObjects (Proxy proxy , String namespace ) throws JsonProcessingException {
384390 SpecExpressionContext context = SpecExpressionContext .create (
385391 proxy , proxy .getSpec ());
386- return parseAdditionalManifests (context , proxy , namespace , proxy .getSpec ().getKubernetesAdditionalManifests ());
392+ return parseAdditionalManifests (context , namespace , proxy .getSpec ().getKubernetesAdditionalManifests ());
387393 }
388394
389- private List <HasMetadata > getAdditionPersistentManifestsAsObjects (Proxy proxy , String namespace ) {
395+ private List <GenericKubernetesResource > getAdditionPersistentManifestsAsObjects (Proxy proxy , String namespace ) throws JsonProcessingException {
390396 SpecExpressionContext context = SpecExpressionContext .create (
391397 proxy , proxy .getSpec (),
392398 userService .getCurrentAuth ().getPrincipal (), userService .getCurrentAuth ().getCredentials ());
393- return parseAdditionalManifests (context , proxy , namespace , proxy .getSpec ().getKubernetesAdditionalPersistentManifests ());
399+ return parseAdditionalManifests (context , namespace , proxy .getSpec ().getKubernetesAdditionalPersistentManifests ());
400+ }
401+
402+ private void applyAdditionalManifest (GenericKubernetesResource resource ) {
403+ NonNamespaceOperation <GenericKubernetesResource , GenericKubernetesResourceList , Resource <GenericKubernetesResource >> client
404+ = kubeClient .genericKubernetesResources (resource .getApiVersion (), resource .getKind ()).inNamespace (resource .getMetadata ().getNamespace ());
405+ String policy ;
406+ if (resource .getMetadata ().getAnnotations () != null ) {
407+ policy = resource .getMetadata ().getAnnotations ().getOrDefault (ANNOTATION_MANIFEST_POLICY , "CreateOnce" );
408+ } else {
409+ policy = "CreateOnce" ;
410+ }
411+ if (policy .equalsIgnoreCase ("CreateOnce" )) {
412+ if (kubeClient .resource (resource ).fromServer ().get () == null ) {
413+ client .create (resource );
414+ }
415+ } else if (policy .equalsIgnoreCase ("Patch" )) {
416+ if (kubeClient .resource (resource ).fromServer ().get () == null ) {
417+ client .create (resource );
418+ } else {
419+ client .patch (PatchContext .of (PatchType .JSON_MERGE ), resource );
420+ }
421+ } else if (policy .equalsIgnoreCase ("Delete" )) {
422+ if (kubeClient .resource (resource ).fromServer ().get () != null ) {
423+ kubeClient .resource (resource ).withGracePeriod (0 ).delete ();
424+ }
425+ } else if (policy .equalsIgnoreCase ("Replace" )) {
426+ if (kubeClient .resource (resource ).fromServer ().get () != null ) {
427+ kubeClient .resource (resource ).withGracePeriod (0 ).delete ();
428+ }
429+ client .create (resource );
430+ } else {
431+ log .warn ("Unknown manifest-policy: {}" , policy );
432+ }
394433 }
395434
396435 /**
397436 * Converts the additional manifests of the spec into HasMetadata objects.
398437 * When the resource has no namespace definition, the provided namespace
399438 * parameter will be used.
400439 */
401- private List <HasMetadata > parseAdditionalManifests (SpecExpressionContext context , Proxy proxy , String namespace , List <String > manifests ) {
402-
403- ArrayList <HasMetadata > result = new ArrayList <>();
440+ private List <GenericKubernetesResource > parseAdditionalManifests (SpecExpressionContext context , String namespace , List <String > manifests ) throws JsonProcessingException {
441+ ArrayList <GenericKubernetesResource > result = new ArrayList <>();
404442 for (String manifest : manifests ) {
405443 String expressionManifest = expressionResolver .evaluateToString (manifest , context );
406- HasMetadata object = Serialization .unmarshal ( new ByteArrayInputStream (expressionManifest . getBytes ())); // used to determine whether the manifest has specified a namespace
444+ GenericKubernetesResource object = Serialization .yamlMapper (). readValue (expressionManifest , GenericKubernetesResource . class );
407445
408- HasMetadata fullObject = kubeClient .load (new ByteArrayInputStream (expressionManifest .getBytes ())).get ().get (0 );
446+ GenericKubernetesResource fullObject = kubeClient
447+ .genericKubernetesResources (object .getApiVersion (), object .getKind ())
448+ .load (new ByteArrayInputStream (expressionManifest .getBytes ())).get ();
449+
409450 if (object .getMetadata ().getNamespace () == null ) {
410451 // the load method (in some cases) automatically sets a namespace when no namespace is provided
411452 // therefore we overwrite this namespace with the namespace of the pod.
@@ -458,13 +499,18 @@ protected void doStopProxy(Proxy proxy) throws Exception {
458499 }
459500
460501 Pod pod = Pod .class .cast (container .getParameters ().get (PARAM_POD ));
461- if (pod != null ) kubeClient .pods ().inNamespace (kubeNamespace ).delete (pod );
502+ if (pod != null ) {
503+ // specify gracePeriod 0, this was the default in previous version of the fabric8 k8s client
504+ kubeClient .resource (pod ).withGracePeriod (0 ).delete ();
505+ }
462506 Service service = Service .class .cast (container .getParameters ().get (PARAM_SERVICE ));
463- if (service != null ) kubeClient .services ().inNamespace (kubeNamespace ).delete (service );
507+ if (service != null ) {
508+ kubeClient .resource (service ).withGracePeriod (0 ).delete ();
509+ }
464510
465511 // delete additional manifests
466512 for (HasMetadata fullObject : getAdditionManifestsAsObjects (proxy , kubeNamespace )) {
467- kubeClient .resource (fullObject ).delete ();
513+ kubeClient .resource (fullObject ).withGracePeriod ( 0 ). delete ();
468514 }
469515 }
470516 }
0 commit comments