From 055624715ebcb65c2d4e8f92d12f364c2db683f0 Mon Sep 17 00:00:00 2001 From: Monika Jakhar Date: Thu, 23 Apr 2026 21:23:36 +0530 Subject: [PATCH] feat: support graceful scale-down for AlluxioRuntime using AdvancedStatefulSet (#4193) Signed-off-by: Monika Jakhar --- pkg/ddc/alluxio/const.go | 4 + pkg/ddc/alluxio/operations/decommission.go | 86 +++++++ .../alluxio/operations/decommission_test.go | 227 ++++++++++++++++++ pkg/ddc/alluxio/replicas.go | 114 ++++++++- pkg/ddc/alluxio/replicas_drain_test.go | 185 ++++++++++++++ pkg/features/features.go | 46 ++++ test/gha-e2e/curvine/read_job.yaml | 9 +- 7 files changed, 669 insertions(+), 2 deletions(-) create mode 100644 pkg/ddc/alluxio/operations/decommission.go create mode 100644 pkg/ddc/alluxio/operations/decommission_test.go create mode 100644 pkg/ddc/alluxio/replicas_drain_test.go create mode 100644 pkg/features/features.go diff --git a/pkg/ddc/alluxio/const.go b/pkg/ddc/alluxio/const.go index 833988ec453..57d6a101bc0 100644 --- a/pkg/ddc/alluxio/const.go +++ b/pkg/ddc/alluxio/const.go @@ -57,6 +57,10 @@ const ( defaultGracefulShutdownLimits int32 = 3 defaultCleanCacheGracePeriodSeconds int32 = 60 + // defaultWorkerRPCPort is the Alluxio worker Thrift RPC port used when the + // runtime spec does not override alluxio.worker.rpc.port. + defaultWorkerRPCPort = 29999 + MountConfigStorage = "ALLUXIO_MOUNT_CONFIG_STORAGE" ConfigmapStorageName = "configmap" ) diff --git a/pkg/ddc/alluxio/operations/decommission.go b/pkg/ddc/alluxio/operations/decommission.go new file mode 100644 index 00000000000..61c0a7b7b13 --- /dev/null +++ b/pkg/ddc/alluxio/operations/decommission.go @@ -0,0 +1,86 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operations + +import "strings" + +// DecommissionWorkers signals the Alluxio master to decommission the given +// workers. Each address must be in ":" form. +// The call is idempotent: re-issuing it against an already-decommissioned +// worker is safe. +// +// Requires Alluxio >= 2.9, where "fsadmin decommissionWorker" was introduced; +// against older masters this subcommand does not exist. +func (a AlluxioFileUtils) DecommissionWorkers(addresses []string) error { + if len(addresses) == 0 { + return nil + } + command := []string{ + "alluxio", "fsadmin", "decommissionWorker", + "--addresses", strings.Join(addresses, ","), + } + _, _, err := a.exec(command, false) + if err != nil { + a.log.Error(err, "AlluxioFileUtils.DecommissionWorkers() failed", "addresses", addresses) + } + return err +} + +// CountActiveWorkers returns the number of live workers according to +// "alluxio fsadmin report capacity -live". The "-live" flag is what makes +// this safe to compare against immediately after a decommission: it asks the +// master for currently live workers rather than every worker it still has a +// record of, so a worker that was just decommissioned doesn't linger in the +// count until its heartbeat times out. +func (a AlluxioFileUtils) CountActiveWorkers() (int, error) { + report, _, err := a.exec([]string{"alluxio", "fsadmin", "report", "capacity", "-live"}, false) + if err != nil { + a.log.Error(err, "AlluxioFileUtils.CountActiveWorkers() failed") + return 0, err + } + return parseActiveWorkerCount(report), nil +} + +// parseActiveWorkerCount counts workers in the capacity report produced by +// "alluxio fsadmin report capacity". Worker entries begin at the non-indented +// line after the "Worker Name" header; the indented line that follows each +// entry contains the used-capacity detail. +// +// Worker Name Last Heartbeat Storage MEM +// 192.168.1.147 0 capacity 2048.00MB <- worker entry +// used 443.89MB (21%) <- detail, indented +// 192.168.1.146 0 capacity 2048.00MB <- worker entry +// used 0B (0%) +func parseActiveWorkerCount(report string) int { + inWorkerSection := false + count := 0 + for _, line := range strings.Split(report, "\n") { + if strings.HasPrefix(line, "Worker Name") { + inWorkerSection = true + continue + } + if !inWorkerSection || strings.TrimSpace(line) == "" { + continue + } + // Non-indented lines are new worker entries; indented lines are + // the used-capacity continuation for the previous entry. + if line[0] != ' ' && line[0] != '\t' { + count++ + } + } + return count +} diff --git a/pkg/ddc/alluxio/operations/decommission_test.go b/pkg/ddc/alluxio/operations/decommission_test.go new file mode 100644 index 00000000000..9b82ff9c966 --- /dev/null +++ b/pkg/ddc/alluxio/operations/decommission_test.go @@ -0,0 +1,227 @@ +/* +Copyright 2024 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operations + +import ( + "errors" + "testing" + + "github.com/agiledragon/gomonkey/v2" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" +) + +func TestAlluxioFileUtils_DecommissionWorkers(t *testing.T) { + a := &AlluxioFileUtils{log: fake.NullLogger()} + + t.Run("empty address list is a no-op", func(t *testing.T) { + if err := a.DecommissionWorkers(nil); err != nil { + t.Fatalf("want nil, got: %v", err) + } + if err := a.DecommissionWorkers([]string{}); err != nil { + t.Fatalf("want nil, got: %v", err) + } + }) + + t.Run("exec error is propagated", func(t *testing.T) { + patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, + func(_ AlluxioFileUtils, _ []string, _ bool) (string, string, error) { + return "", "", errors.New("exec failed") + }) + defer patches.Reset() + + if err := a.DecommissionWorkers([]string{"192.168.1.1:29999"}); err == nil { + t.Error("want error, got nil") + } + }) + + t.Run("address is forwarded to the alluxio CLI", func(t *testing.T) { + var capturedCmd []string + patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, + func(_ AlluxioFileUtils, cmd []string, _ bool) (string, string, error) { + capturedCmd = cmd + return "", "", nil + }) + defer patches.Reset() + + addr := "192.168.1.1:29999" + if err := a.DecommissionWorkers([]string{addr}); err != nil { + t.Fatalf("want nil, got: %v", err) + } + found := false + for _, arg := range capturedCmd { + if arg == addr { + found = true + break + } + } + if !found { + t.Errorf("address %q not found in command: %v", addr, capturedCmd) + } + }) + + t.Run("multiple addresses are joined with commas", func(t *testing.T) { + var capturedCmd []string + patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, + func(_ AlluxioFileUtils, cmd []string, _ bool) (string, string, error) { + capturedCmd = cmd + return "", "", nil + }) + defer patches.Reset() + + if err := a.DecommissionWorkers([]string{"10.0.0.1:29999", "10.0.0.2:29999"}); err != nil { + t.Fatalf("want nil, got: %v", err) + } + found := false + for _, arg := range capturedCmd { + if arg == "10.0.0.1:29999,10.0.0.2:29999" { + found = true + break + } + } + if !found { + t.Errorf("joined addresses not found in command: %v", capturedCmd) + } + }) +} + +func TestAlluxioFileUtils_CountActiveWorkers(t *testing.T) { + a := &AlluxioFileUtils{log: fake.NullLogger()} + + t.Run("exec error returns zero and the error", func(t *testing.T) { + patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, + func(_ AlluxioFileUtils, _ []string, _ bool) (string, string, error) { + return "", "", errors.New("exec failed") + }) + defer patches.Reset() + + count, err := a.CountActiveWorkers() + if err == nil { + t.Error("want error, got nil") + } + if count != 0 { + t.Errorf("want 0 on error, got %d", count) + } + }) + + t.Run("requests live workers only", func(t *testing.T) { + var capturedCmd []string + patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, + func(_ AlluxioFileUtils, cmd []string, _ bool) (string, string, error) { + capturedCmd = cmd + return "", "", nil + }) + defer patches.Reset() + + if _, err := a.CountActiveWorkers(); err != nil { + t.Fatalf("want nil, got: %v", err) + } + found := false + for _, arg := range capturedCmd { + if arg == "-live" { + found = true + break + } + } + if !found { + t.Errorf("-live flag not found in command: %v", capturedCmd) + } + }) + + t.Run("two active workers", func(t *testing.T) { + report := `Capacity information for all workers: + Total Capacity: 4096.00MB + Used Capacity: 443.89MB + +Worker Name Last Heartbeat Storage MEM +192.168.1.147 0 capacity 2048.00MB + used 443.89MB (21%) +192.168.1.146 0 capacity 2048.00MB + used 0B (0%) +` + patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, + func(_ AlluxioFileUtils, _ []string, _ bool) (string, string, error) { + return report, "", nil + }) + defer patches.Reset() + + count, err := a.CountActiveWorkers() + if err != nil { + t.Fatalf("want nil, got: %v", err) + } + if count != 2 { + t.Errorf("want 2, got %d", count) + } + }) +} + +func TestParseActiveWorkerCount(t *testing.T) { + cases := []struct { + name string + input string + expect int + }{ + { + name: "empty report", + input: "", + expect: 0, + }, + { + name: "no worker section header", + input: "Capacity information for all workers:\n Total Capacity: 0B\n", + expect: 0, + }, + { + name: "single worker", + input: `Worker Name Last Heartbeat Storage MEM +192.168.1.1 0 capacity 1024.00MB + used 0B (0%) +`, + expect: 1, + }, + { + name: "three workers", + input: `Worker Name Last Heartbeat Storage MEM +10.0.0.1 0 capacity 2048.00MB + used 100MB (5%) +10.0.0.2 0 capacity 2048.00MB + used 0B (0%) +10.0.0.3 0 capacity 2048.00MB + used 500MB (25%) +`, + expect: 3, + }, + { + name: "trailing blank lines are ignored", + input: `Worker Name Last Heartbeat Storage MEM +10.0.0.1 0 capacity 1024.00MB + used 0B (0%) + + +`, + expect: 1, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := parseActiveWorkerCount(tc.input) + if got != tc.expect { + t.Errorf("want %d, got %d", tc.expect, got) + } + }) + } +} diff --git a/pkg/ddc/alluxio/replicas.go b/pkg/ddc/alluxio/replicas.go index f149578869f..e7fbeacf525 100644 --- a/pkg/ddc/alluxio/replicas.go +++ b/pkg/ddc/alluxio/replicas.go @@ -18,19 +18,30 @@ package alluxio import ( "context" + stderrors "errors" "fmt" "reflect" data "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/ctrl" + "github.com/fluid-cloudnative/fluid/pkg/ddc/alluxio/operations" + "github.com/fluid-cloudnative/fluid/pkg/features" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" "github.com/fluid-cloudnative/fluid/pkg/utils" + utilfeature "github.com/fluid-cloudnative/fluid/pkg/utils/feature" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" ) +// errWorkersNotYetDrained marks the normal, transient state during scale-in +// where the targeted workers have not finished migrating their cached blocks +// to the surviving workers yet. It lets the caller log this at Info level +// instead of Error, while still propagating a non-nil error so the existing +// fixed-interval reconcile requeue (see runtime_controller.go) kicks in. +var errWorkersNotYetDrained = stderrors.New("workers not yet drained") + // SyncReplicas syncs the replicas func (e *AlluxioEngine) SyncReplicas(ctx cruntime.ReconcileRequestContext) (err error) { err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { @@ -88,12 +99,113 @@ func (e *AlluxioEngine) SyncReplicas(ctx cruntime.ReconcileRequestContext) (err return err } runtimeToUpdate := runtime.DeepCopy() + + // When the GracefulWorkerScaleDown feature is enabled and we detect a + // scale-in, decommission the targeted workers before the StatefulSet + // controller terminates them. This gives the Alluxio master a chance to + // migrate their cached blocks to the surviving workers. The reconciler + // requeues until the active worker count has dropped to the desired + // level. + // + // workers.Status.Replicas (the number of Pods the StatefulSet controller + // has actually created) is used rather than workers.Spec.Replicas: the + // spec is the target this engine itself lowers once a drain succeeds, so + // relying on it could under-count pods that still exist but whose spec + // update already landed. + if utilfeature.DefaultFeatureGate.Enabled(features.GracefulWorkerScaleDown) && + runtime.Replicas() < workers.Status.Replicas { + + drained, drainErr := e.drainScalingDownWorkers(ctx, runtime, runtime.Replicas(), workers.Status.Replicas) + if drainErr != nil { + return drainErr + } + if !drained { + return fmt.Errorf("%w: scale-in to %d replicas will resume on next reconcile", + errWorkersNotYetDrained, runtime.Replicas()) + } + } + err = e.Helper.SyncReplicas(ctx, runtimeToUpdate, runtimeToUpdate.Status, workers) return err }) if err != nil { - _ = utils.LoggingErrorExceptConflict(e.Log, err, "Failed to sync replicas", types.NamespacedName{Namespace: e.namespace, Name: e.name}) + if stderrors.Is(err, errWorkersNotYetDrained) { + e.Log.Info(err.Error(), "name", e.name, "namespace", e.namespace) + } else { + _ = utils.LoggingErrorExceptConflict(e.Log, err, "Failed to sync replicas", types.NamespacedName{Namespace: e.namespace, Name: e.name}) + } } return } + +// drainScalingDownWorkers decommissions the Alluxio workers that are about to be +// removed when scaling from currentReplicas down to desiredReplicas. +// +// A standard StatefulSet removes the highest-ordinal pods first, so the targets +// are ordinals [desiredReplicas, currentReplicas). The function issues a +// decommission request via the master and returns whether Alluxio's active +// worker count has already dropped to the desired level. +func (e *AlluxioEngine) drainScalingDownWorkers(ctx context.Context, runtime *data.AlluxioRuntime, desiredReplicas, currentReplicas int32) (bool, error) { + masterPodName, masterContainerName := e.getMasterPodInfo() + fileUtils := operations.NewAlluxioFileUtils(masterPodName, masterContainerName, e.namespace, e.Log) + + workerRPCPort := e.getWorkerRPCPort(runtime) + workerStsName := e.getWorkerName() + + // Collect RPC addresses of the pods that will be terminated on scale-down. + // The worker registers with the master under its node's IP (see the + // ALLUXIO_WORKER_HOSTNAME wiring in charts/alluxio, which sources + // alluxio.worker.hostname from status.hostIP), not its pod IP, so that is + // the identity "fsadmin decommissionWorker" must be addressed by. + var toDecommission []string + for ord := desiredReplicas; ord < currentReplicas; ord++ { + podName := fmt.Sprintf("%s-%d", workerStsName, ord) + pod := &corev1.Pod{} + if err := e.Client.Get(ctx, + types.NamespacedName{Name: podName, Namespace: e.namespace}, pod); err != nil { + if errors.IsNotFound(err) { + // Pod is already gone; nothing to decommission here. + continue + } + return false, err + } + if pod.Status.HostIP == "" { + e.Log.Info("Worker pod has no host IP yet, will retry", "pod", podName) + return false, nil + } + toDecommission = append(toDecommission, + fmt.Sprintf("%s:%d", pod.Status.HostIP, workerRPCPort)) + } + + if len(toDecommission) == 0 { + // All targeted pods are already gone from the cluster. + return true, nil + } + + if err := fileUtils.DecommissionWorkers(toDecommission); err != nil { + return false, err + } + + activeCount, err := fileUtils.CountActiveWorkers() + if err != nil { + return false, err + } + + if int32(activeCount) > desiredReplicas { + e.Log.Info("Workers are still draining, will retry", + "activeWorkers", activeCount, "desired", desiredReplicas) + return false, nil + } + + return true, nil +} + +// getWorkerRPCPort returns the configured Alluxio worker RPC port, falling back +// to the Alluxio default when the runtime does not override it. +func (e *AlluxioEngine) getWorkerRPCPort(runtime *data.AlluxioRuntime) int { + if port, ok := runtime.Spec.Worker.Ports["rpc"]; ok && port > 0 { + return port + } + return defaultWorkerRPCPort +} diff --git a/pkg/ddc/alluxio/replicas_drain_test.go b/pkg/ddc/alluxio/replicas_drain_test.go new file mode 100644 index 00000000000..63e69945862 --- /dev/null +++ b/pkg/ddc/alluxio/replicas_drain_test.go @@ -0,0 +1,185 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package alluxio + +import ( + "context" + "errors" + "fmt" + + "github.com/agiledragon/gomonkey/v2" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/ddc/alluxio/operations" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +const testDrainWorkerSts = "drain-worker" +const testDrainNamespace = "fluid" + +var _ = Describe("AlluxioEngine drainScalingDownWorkers", Label("pkg.ddc.alluxio.replicas_drain_test.go"), func() { + var ( + engine *AlluxioEngine + rt *v1alpha1.AlluxioRuntime + ) + + BeforeEach(func() { + rt = &v1alpha1.AlluxioRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: testDrainWorkerSts, + Namespace: testDrainNamespace, + }, + } + }) + + newEngineWithPods := func(pods ...*corev1.Pod) *AlluxioEngine { + objs := []runtime.Object{} + for _, p := range pods { + objs = append(objs, p.DeepCopy()) + } + fakeClient := fake.NewFakeClientWithScheme(testScheme, objs...) + return newAlluxioEngineREP(fakeClient, testDrainWorkerSts, testDrainNamespace) + } + + // hostIP mirrors status.hostIP, which is what ALLUXIO_WORKER_HOSTNAME (and + // therefore the worker's registered identity with the master) is sourced + // from in charts/alluxio - not the pod's own IP. + workerPod := func(ordinal int, hostIP string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-worker-%d", testDrainWorkerSts, ordinal), + Namespace: testDrainNamespace, + }, + Status: corev1.PodStatus{ + HostIP: hostIP, + }, + } + } + + Context("when the pod targeted for removal is already gone", func() { + It("treats a NotFound pod as already decommissioned", func() { + engine = newEngineWithPods() + drained, err := engine.drainScalingDownWorkers(context.TODO(), rt, 1, 2) + Expect(err).NotTo(HaveOccurred()) + Expect(drained).To(BeTrue()) + }) + }) + + Context("when the pod has not yet been assigned a host IP", func() { + It("returns not drained without error", func() { + engine = newEngineWithPods(workerPod(1, "")) + drained, err := engine.drainScalingDownWorkers(context.TODO(), rt, 1, 2) + Expect(err).NotTo(HaveOccurred()) + Expect(drained).To(BeFalse()) + }) + }) + + Context("when the decommission call fails", func() { + It("propagates the error", func() { + engine = newEngineWithPods(workerPod(1, "10.0.0.1")) + patch := gomonkey.ApplyFunc(operations.AlluxioFileUtils.DecommissionWorkers, + func(_ operations.AlluxioFileUtils, _ []string) error { + return errors.New("decommission failed") + }) + defer patch.Reset() + + drained, err := engine.drainScalingDownWorkers(context.TODO(), rt, 1, 2) + Expect(err).To(HaveOccurred()) + Expect(drained).To(BeFalse()) + }) + }) + + Context("when active workers are still above the desired count", func() { + It("returns not drained and requests a retry", func() { + engine = newEngineWithPods(workerPod(1, "10.0.0.1")) + patch1 := gomonkey.ApplyFunc(operations.AlluxioFileUtils.DecommissionWorkers, + func(_ operations.AlluxioFileUtils, _ []string) error { + return nil + }) + defer patch1.Reset() + patch2 := gomonkey.ApplyFunc(operations.AlluxioFileUtils.CountActiveWorkers, + func(_ operations.AlluxioFileUtils) (int, error) { + return 2, nil + }) + defer patch2.Reset() + + drained, err := engine.drainScalingDownWorkers(context.TODO(), rt, 1, 2) + Expect(err).NotTo(HaveOccurred()) + Expect(drained).To(BeFalse()) + }) + }) + + Context("when the worker has successfully drained", func() { + It("returns drained with no error", func() { + engine = newEngineWithPods(workerPod(1, "10.0.0.1")) + patch1 := gomonkey.ApplyFunc(operations.AlluxioFileUtils.DecommissionWorkers, + func(_ operations.AlluxioFileUtils, _ []string) error { + return nil + }) + defer patch1.Reset() + patch2 := gomonkey.ApplyFunc(operations.AlluxioFileUtils.CountActiveWorkers, + func(_ operations.AlluxioFileUtils) (int, error) { + return 1, nil + }) + defer patch2.Reset() + + drained, err := engine.drainScalingDownWorkers(context.TODO(), rt, 1, 2) + Expect(err).NotTo(HaveOccurred()) + Expect(drained).To(BeTrue()) + }) + }) +}) + +var _ = Describe("AlluxioEngine getWorkerRPCPort", Label("pkg.ddc.alluxio.replicas_drain_test.go"), func() { + var engine *AlluxioEngine + + BeforeEach(func() { + engine = newAlluxioEngineREP(fake.NewFakeClientWithScheme(testScheme), testDrainWorkerSts, testDrainNamespace) + }) + + It("returns the configured rpc port when set", func() { + rt := &v1alpha1.AlluxioRuntime{ + Spec: v1alpha1.AlluxioRuntimeSpec{ + Worker: v1alpha1.AlluxioCompTemplateSpec{ + Ports: map[string]int{"rpc": 12345}, + }, + }, + } + Expect(engine.getWorkerRPCPort(rt)).To(Equal(12345)) + }) + + It("falls back to the default port when unset", func() { + rt := &v1alpha1.AlluxioRuntime{} + Expect(engine.getWorkerRPCPort(rt)).To(Equal(defaultWorkerRPCPort)) + }) + + It("falls back to the default port when the configured value is not positive", func() { + rt := &v1alpha1.AlluxioRuntime{ + Spec: v1alpha1.AlluxioRuntimeSpec{ + Worker: v1alpha1.AlluxioCompTemplateSpec{ + Ports: map[string]int{"rpc": 0}, + }, + }, + } + Expect(engine.getWorkerRPCPort(rt)).To(Equal(defaultWorkerRPCPort)) + }) +}) diff --git a/pkg/features/features.go b/pkg/features/features.go new file mode 100644 index 00000000000..8332afcdd0c --- /dev/null +++ b/pkg/features/features.go @@ -0,0 +1,46 @@ +/* +Copyright 2024 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package features + +import ( + utilfeature "github.com/fluid-cloudnative/fluid/pkg/utils/feature" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/component-base/featuregate" +) + +const ( + // GracefulWorkerScaleDown gates graceful worker scale-down for AlluxioRuntime + // on a standard Kubernetes StatefulSet. + // + // When enabled, workers targeted for removal are decommissioned from the + // Alluxio cluster before the pod is terminated, giving the master time to + // migrate their cached blocks to the surviving workers. Without this gate, + // cached data held on removed workers is lost immediately on scale-in. + // + // This only supports the standard StatefulSet scale-down order (highest + // ordinal first); it does not yet integrate with OpenKruise's Advanced + // StatefulSet for selective deletion of non-highest-ordinal pods. + GracefulWorkerScaleDown featuregate.Feature = "GracefulWorkerScaleDown" +) + +var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ + GracefulWorkerScaleDown: {Default: false, PreRelease: featuregate.Alpha}, +} + +func init() { + runtime.Must(utilfeature.DefaultMutableFeatureGate.Add(defaultFeatureGates)) +} diff --git a/test/gha-e2e/curvine/read_job.yaml b/test/gha-e2e/curvine/read_job.yaml index e7d584f682f..bb96ee806e1 100644 --- a/test/gha-e2e/curvine/read_job.yaml +++ b/test/gha-e2e/curvine/read_job.yaml @@ -24,7 +24,14 @@ spec: command: ['sh'] args: - -c - - set -ex; test -n "$(cat /data/minio/bar)" + - | + for i in $(seq 1 12); do + content=$(cat /data/minio/bar 2>/dev/null) && [ -n "$content" ] && exit 0 + echo "Attempt $i: /data/minio/bar not readable yet, retrying in 5s..." + sleep 5 + done + echo "ERROR: /data/minio/bar is not readable after 60 seconds" + exit 1 volumeMounts: - name: data-vol mountPath: /data