-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: support graceful scale-down for AlluxioRuntime using AdvancedStatefulSet #5805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| /* | ||
| Copyright 2026 The Fluid Authors. | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package operations | ||
|
|
||
| import "strings" | ||
|
|
||
| // DecommissionWorkers signals the Alluxio master to decommission the given | ||
| // workers. Each address must be in "<host>:<rpcPort>" form. | ||
| // The call is idempotent: re-issuing it against an already-decommissioned | ||
| // worker is safe. | ||
| // | ||
| // Requires Alluxio >= 2.9, where "fsadmin decommissionWorker" was introduced; | ||
| // against older masters this subcommand does not exist. | ||
| func (a AlluxioFileUtils) DecommissionWorkers(addresses []string) error { | ||
| if len(addresses) == 0 { | ||
| return nil | ||
| } | ||
| command := []string{ | ||
| "alluxio", "fsadmin", "decommissionWorker", | ||
| "--addresses", strings.Join(addresses, ","), | ||
| } | ||
| _, _, err := a.exec(command, false) | ||
| if err != nil { | ||
| a.log.Error(err, "AlluxioFileUtils.DecommissionWorkers() failed", "addresses", addresses) | ||
| } | ||
| return err | ||
| } | ||
|
|
||
| // CountActiveWorkers returns the number of live workers according to | ||
| // "alluxio fsadmin report capacity -live". The "-live" flag is what makes | ||
| // this safe to compare against immediately after a decommission: it asks the | ||
| // master for currently live workers rather than every worker it still has a | ||
| // record of, so a worker that was just decommissioned doesn't linger in the | ||
| // count until its heartbeat times out. | ||
| func (a AlluxioFileUtils) CountActiveWorkers() (int, error) { | ||
| report, _, err := a.exec([]string{"alluxio", "fsadmin", "report", "capacity", "-live"}, false) | ||
| if err != nil { | ||
| a.log.Error(err, "AlluxioFileUtils.CountActiveWorkers() failed") | ||
| return 0, err | ||
| } | ||
| return parseActiveWorkerCount(report), nil | ||
| } | ||
|
|
||
| // parseActiveWorkerCount counts workers in the capacity report produced by | ||
| // "alluxio fsadmin report capacity". Worker entries begin at the non-indented | ||
| // line after the "Worker Name" header; the indented line that follows each | ||
| // entry contains the used-capacity detail. | ||
| // | ||
| // Worker Name Last Heartbeat Storage MEM | ||
| // 192.168.1.147 0 capacity 2048.00MB <- worker entry | ||
| // used 443.89MB (21%) <- detail, indented | ||
| // 192.168.1.146 0 capacity 2048.00MB <- worker entry | ||
| // used 0B (0%) | ||
| func parseActiveWorkerCount(report string) int { | ||
| inWorkerSection := false | ||
| count := 0 | ||
| for _, line := range strings.Split(report, "\n") { | ||
|
jakharmonika364 marked this conversation as resolved.
|
||
| if strings.HasPrefix(line, "Worker Name") { | ||
| inWorkerSection = true | ||
| continue | ||
| } | ||
| if !inWorkerSection || strings.TrimSpace(line) == "" { | ||
| continue | ||
| } | ||
| // Non-indented lines are new worker entries; indented lines are | ||
| // the used-capacity continuation for the previous entry. | ||
| if line[0] != ' ' && line[0] != '\t' { | ||
| count++ | ||
| } | ||
| } | ||
| return count | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,227 @@ | ||
| /* | ||
| Copyright 2024 The Fluid Authors. | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package operations | ||
|
|
||
| import ( | ||
| "errors" | ||
| "testing" | ||
|
|
||
| "github.com/agiledragon/gomonkey/v2" | ||
| "github.com/fluid-cloudnative/fluid/pkg/utils/fake" | ||
| ) | ||
|
|
||
| func TestAlluxioFileUtils_DecommissionWorkers(t *testing.T) { | ||
|
Check failure on line 27 in pkg/ddc/alluxio/operations/decommission_test.go
|
||
| a := &AlluxioFileUtils{log: fake.NullLogger()} | ||
|
|
||
| t.Run("empty address list is a no-op", func(t *testing.T) { | ||
| if err := a.DecommissionWorkers(nil); err != nil { | ||
| t.Fatalf("want nil, got: %v", err) | ||
| } | ||
| if err := a.DecommissionWorkers([]string{}); err != nil { | ||
| t.Fatalf("want nil, got: %v", err) | ||
| } | ||
| }) | ||
|
|
||
| t.Run("exec error is propagated", func(t *testing.T) { | ||
| patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, | ||
| func(_ AlluxioFileUtils, _ []string, _ bool) (string, string, error) { | ||
| return "", "", errors.New("exec failed") | ||
| }) | ||
| defer patches.Reset() | ||
|
|
||
| if err := a.DecommissionWorkers([]string{"192.168.1.1:29999"}); err == nil { | ||
| t.Error("want error, got nil") | ||
| } | ||
| }) | ||
|
|
||
| t.Run("address is forwarded to the alluxio CLI", func(t *testing.T) { | ||
| var capturedCmd []string | ||
| patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, | ||
| func(_ AlluxioFileUtils, cmd []string, _ bool) (string, string, error) { | ||
| capturedCmd = cmd | ||
| return "", "", nil | ||
| }) | ||
| defer patches.Reset() | ||
|
|
||
| addr := "192.168.1.1:29999" | ||
| if err := a.DecommissionWorkers([]string{addr}); err != nil { | ||
| t.Fatalf("want nil, got: %v", err) | ||
| } | ||
| found := false | ||
| for _, arg := range capturedCmd { | ||
| if arg == addr { | ||
| found = true | ||
| break | ||
| } | ||
| } | ||
| if !found { | ||
| t.Errorf("address %q not found in command: %v", addr, capturedCmd) | ||
| } | ||
| }) | ||
|
|
||
| t.Run("multiple addresses are joined with commas", func(t *testing.T) { | ||
| var capturedCmd []string | ||
| patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, | ||
| func(_ AlluxioFileUtils, cmd []string, _ bool) (string, string, error) { | ||
| capturedCmd = cmd | ||
| return "", "", nil | ||
| }) | ||
| defer patches.Reset() | ||
|
|
||
| if err := a.DecommissionWorkers([]string{"10.0.0.1:29999", "10.0.0.2:29999"}); err != nil { | ||
| t.Fatalf("want nil, got: %v", err) | ||
| } | ||
| found := false | ||
| for _, arg := range capturedCmd { | ||
| if arg == "10.0.0.1:29999,10.0.0.2:29999" { | ||
| found = true | ||
| break | ||
| } | ||
| } | ||
| if !found { | ||
| t.Errorf("joined addresses not found in command: %v", capturedCmd) | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| func TestAlluxioFileUtils_CountActiveWorkers(t *testing.T) { | ||
|
Check failure on line 101 in pkg/ddc/alluxio/operations/decommission_test.go
|
||
| a := &AlluxioFileUtils{log: fake.NullLogger()} | ||
|
|
||
| t.Run("exec error returns zero and the error", func(t *testing.T) { | ||
| patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, | ||
| func(_ AlluxioFileUtils, _ []string, _ bool) (string, string, error) { | ||
| return "", "", errors.New("exec failed") | ||
| }) | ||
| defer patches.Reset() | ||
|
|
||
| count, err := a.CountActiveWorkers() | ||
| if err == nil { | ||
| t.Error("want error, got nil") | ||
| } | ||
| if count != 0 { | ||
| t.Errorf("want 0 on error, got %d", count) | ||
| } | ||
| }) | ||
|
|
||
| t.Run("requests live workers only", func(t *testing.T) { | ||
| var capturedCmd []string | ||
| patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, | ||
| func(_ AlluxioFileUtils, cmd []string, _ bool) (string, string, error) { | ||
| capturedCmd = cmd | ||
| return "", "", nil | ||
| }) | ||
| defer patches.Reset() | ||
|
|
||
| if _, err := a.CountActiveWorkers(); err != nil { | ||
| t.Fatalf("want nil, got: %v", err) | ||
| } | ||
| found := false | ||
| for _, arg := range capturedCmd { | ||
| if arg == "-live" { | ||
| found = true | ||
| break | ||
| } | ||
| } | ||
| if !found { | ||
| t.Errorf("-live flag not found in command: %v", capturedCmd) | ||
| } | ||
| }) | ||
|
|
||
| t.Run("two active workers", func(t *testing.T) { | ||
| report := `Capacity information for all workers: | ||
| Total Capacity: 4096.00MB | ||
| Used Capacity: 443.89MB | ||
|
|
||
| Worker Name Last Heartbeat Storage MEM | ||
| 192.168.1.147 0 capacity 2048.00MB | ||
| used 443.89MB (21%) | ||
| 192.168.1.146 0 capacity 2048.00MB | ||
| used 0B (0%) | ||
| ` | ||
| patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, | ||
| func(_ AlluxioFileUtils, _ []string, _ bool) (string, string, error) { | ||
| return report, "", nil | ||
| }) | ||
| defer patches.Reset() | ||
|
|
||
| count, err := a.CountActiveWorkers() | ||
| if err != nil { | ||
| t.Fatalf("want nil, got: %v", err) | ||
| } | ||
| if count != 2 { | ||
| t.Errorf("want 2, got %d", count) | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| func TestParseActiveWorkerCount(t *testing.T) { | ||
| cases := []struct { | ||
| name string | ||
| input string | ||
| expect int | ||
| }{ | ||
| { | ||
| name: "empty report", | ||
| input: "", | ||
| expect: 0, | ||
| }, | ||
| { | ||
| name: "no worker section header", | ||
| input: "Capacity information for all workers:\n Total Capacity: 0B\n", | ||
| expect: 0, | ||
| }, | ||
| { | ||
| name: "single worker", | ||
| input: `Worker Name Last Heartbeat Storage MEM | ||
| 192.168.1.1 0 capacity 1024.00MB | ||
| used 0B (0%) | ||
| `, | ||
| expect: 1, | ||
| }, | ||
| { | ||
| name: "three workers", | ||
| input: `Worker Name Last Heartbeat Storage MEM | ||
| 10.0.0.1 0 capacity 2048.00MB | ||
| used 100MB (5%) | ||
| 10.0.0.2 0 capacity 2048.00MB | ||
| used 0B (0%) | ||
| 10.0.0.3 0 capacity 2048.00MB | ||
| used 500MB (25%) | ||
| `, | ||
| expect: 3, | ||
| }, | ||
| { | ||
| name: "trailing blank lines are ignored", | ||
| input: `Worker Name Last Heartbeat Storage MEM | ||
| 10.0.0.1 0 capacity 1024.00MB | ||
| used 0B (0%) | ||
|
|
||
|
|
||
| `, | ||
| expect: 1, | ||
| }, | ||
| } | ||
|
|
||
| for _, tc := range cases { | ||
| t.Run(tc.name, func(t *testing.T) { | ||
| got := parseActiveWorkerCount(tc.input) | ||
| if got != tc.expect { | ||
| t.Errorf("want %d, got %d", tc.expect, got) | ||
| } | ||
| }) | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.