Skip to content

Commit fbda808

Browse files
kbumalbianchi-lbl
andauthored
parallelize USF and NUSF algorithms using Dask (#1198)
* parallelize USF algorithm using Dask * code formatting * removing import that is not yet ready * put back update best_val removed during refactor * adding minimax mode to USF test * do timing in benchmark * fixing import order per linter * adding dataframe extenstion from dask and pinning to <2024.3 due to bug reported to dask * cleaning up seed usage in dask USF test and benchmark * tweak to work as expected with dask 2024.3 * restrict pytest version * NUSF algorithm parallelization using Dask * fixing up dask tests that load files * Format with Black * make dask CLI flag `kebab-kase` Co-authored-by: Ludovico Bianchi <lbianchi@lbl.gov> * adding type annotations and reusing nusf functions * adding logging when loading dask client fails * fixing type hints to work with python 3.8 * fix type annotation for python 3.8 --------- Co-authored-by: Ludovico Bianchi <lbianchi@lbl.gov>
1 parent 9aa7d35 commit fbda808

13 files changed

Lines changed: 803 additions & 13 deletions

File tree

foqus_lib/foqus.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,21 @@ def main(args_to_parse=None):
348348
parser.add_argument(
349349
"-s", "--runUITestScript", help="Load and run a user interface test script"
350350
)
351+
parser.add_argument(
352+
"--sdoe-use-dask",
353+
action="store_true",
354+
help="Use Dask for parallelizing SDoE calculations",
355+
dest="sdoe_use_dask",
356+
)
357+
351358
args = parser.parse_args(args=args_to_parse)
359+
if args.sdoe_use_dask:
360+
import dask.config as dconf
361+
from dask.distributed import Client
362+
363+
Client() # n_workers=4, threads_per_worker=1)
364+
dconf.set({"dataframe.convert-string": False})
365+
352366
# before changing the directory get absolute path for file to load
353367
# this way it will be relative to where you execute foqus instead
354368
# or relative to the working dir

foqus_lib/framework/sdoe/nusf.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# "https://github.com/CCSI-Toolset/FOQUS".
1414
#################################################################################
1515
import time
16-
from typing import Optional, Tuple, List, Dict, TypedDict
16+
from typing import Dict, List, Optional, Tuple, TypedDict, Union
1717

1818
import numpy as np
1919
import pandas as pd # only used for the final output of criterion
@@ -60,6 +60,7 @@ def update_min_dist(
6060
mties: int,
6161
dmat: np.ndarray,
6262
hist: np.ndarray,
63+
rand_seed: Union[int, None],
6364
) -> Tuple[
6465
np.ndarray, float, np.ndarray, int, np.ndarray, Optional[int], Optional[int], bool
6566
]:
@@ -145,10 +146,11 @@ def step(
145146
update = True
146147
added = None
147148
removed = None
149+
rand_gen = np.random.default_rng(rand_seed)
148150

149151
if d0_max > md: # if maximin increased
150152
_md = d0_max
151-
k = np.random.randint(pts.shape[0])
153+
k = rand_gen.integers(low=0, high=pts.shape[0])
152154
pt = pts[k]
153155
rcand, md, mdpts, mties, dmat, added, removed = step(
154156
pt, rcand, cand, mdpts_cand, dmat
@@ -157,7 +159,7 @@ def step(
157159
nselect = np.argwhere(mt0[pts[:, 0], pts[:, 1]] < mties).flatten()
158160
if nselect.size > 0:
159161
pt = pts[
160-
np.random.choice(nselect)
162+
rand_gen.choice(nselect)
161163
] # take the subset of pts where the corresponding ties is less than mties
162164
rcand, md, mdpts, mties, dmat, added, removed = step(
163165
pt, rcand, cand, mdpts_cand, dmat
@@ -272,6 +274,7 @@ def criterion(
272274
nd: int, # design size <= len(candidates)
273275
mode: str = "maximin",
274276
hist: Optional[pd.DataFrame] = None,
277+
rand_seed: Union[int, None] = None,
275278
) -> Dict:
276279
ncand = len(cand)
277280
if hist is not None:
@@ -304,6 +307,8 @@ def criterion(
304307
np.concatenate((cand_np_, hist.to_numpy())), idx_np
305308
)
306309

310+
rand_gen = np.random.default_rng(rand_seed)
311+
307312
def step(mwr: int) -> Dict:
308313
cand_np = scale_y(scale_method, mwr, cand_np_, idw_np)
309314

@@ -328,7 +333,7 @@ def step(mwr: int) -> Dict:
328333
wts = cand_np[:, idw_np]
329334
wts_sum = np.sum(wts)
330335
prob = wts / wts_sum
331-
rand_index = np.random.choice(ncand, nd, replace=False, p=prob)
336+
rand_index = rand_gen.choice(ncand, nd, replace=False, p=prob)
332337

333338
# extract the <nd> rows
334339
rcand = cand_np[rand_index]
@@ -360,6 +365,7 @@ def step(mwr: int) -> Dict:
360365
mties,
361366
dmat,
362367
hist=hist_np,
368+
rand_seed=rand_seed + rand_index if rand_seed is not None else None,
363369
)
364370

365371
if update_:
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
#################################################################################
2+
# FOQUS Copyright (c) 2012 - 2023, by the software owners: Oak Ridge Institute
3+
# for Science and Education (ORISE), TRIAD National Security, LLC., Lawrence
4+
# Livermore National Security, LLC., The Regents of the University of
5+
# California, through Lawrence Berkeley National Laboratory, Battelle Memorial
6+
# Institute, Pacific Northwest Division through Pacific Northwest National
7+
# Laboratory, Carnegie Mellon University, West Virginia University, Boston
8+
# University, the Trustees of Princeton University, The University of Texas at
9+
# Austin, URS Energy & Construction, Inc., et al. All rights reserved.
10+
#
11+
# Please see the file LICENSE.md for full copyright and license information,
12+
# respectively. This file is also available online at the URL
13+
# "https://github.com/CCSI-Toolset/FOQUS".
14+
#################################################################################
15+
import time
16+
from typing import Dict, List, Optional, Tuple, TypedDict, Union
17+
18+
import dask.bag as db
19+
import numpy as np
20+
import pandas as pd # only used for the final output of criterion
21+
from scipy.stats import rankdata
22+
23+
from .distance import compute_dist, compute_min_params
24+
from .nusf import compute_dmat, scale_xs, scale_y, update_min_dist
25+
26+
27+
def criterion(
28+
cand: pd.DataFrame, # candidates
29+
args: TypedDict(
30+
"args",
31+
{
32+
"icol": str,
33+
"xcols": List,
34+
"wcol": str,
35+
"max_iterations": int,
36+
"mwr_values": List,
37+
"scale_method": str,
38+
},
39+
), # maximum number of iterations, mwr values, scale method, index types
40+
nr: int, # number of restarts (each restart uses a random set of <nd> points)
41+
nd: int, # design size <= len(candidates)
42+
mode: str = "maximin",
43+
hist: Optional[pd.DataFrame] = None,
44+
rand_seed: Union[int, None] = None,
45+
) -> Dict:
46+
47+
ncand = len(cand)
48+
if hist is not None:
49+
nhist = len(hist)
50+
assert nd <= ncand, "Number of designs exceeds number of available candidates."
51+
52+
mode = mode.lower()
53+
assert (
54+
mode == "maximin"
55+
), "MODE {} not recognized for NUSF. Only MAXIMIN is currently supported.".format(
56+
mode
57+
)
58+
59+
T = args["max_iterations"]
60+
mwr_vals = args["mwr_values"]
61+
scale_method = args["scale_method"]
62+
63+
# index types
64+
idx_np = [cand.columns.get_loc(i) for i in args["xcols"]]
65+
idw_np = cand.columns.get_loc(args["wcol"])
66+
67+
cand_np_ = cand.to_numpy()
68+
cand_np_unscaled = cand_np_.copy()
69+
70+
# Combine candidates and history before scaling
71+
if hist is None:
72+
cand_np_, _xmin, _xmax = scale_xs(cand_np_, idx_np)
73+
else:
74+
cand_np_, _xmin, _xmax = scale_xs(
75+
np.concatenate((cand_np_, hist.to_numpy())), idx_np
76+
)
77+
78+
rand_gen = np.random.default_rng(rand_seed)
79+
80+
def step(
81+
mwr_tuple: Tuple[int, List[int], np.ndarray, Union[np.ndarray, None]]
82+
) -> Dict:
83+
mwr, rands, cand_np, hist_np = mwr_tuple
84+
85+
best_cand = []
86+
best_md = 0
87+
best_mties = 0
88+
best_index = []
89+
90+
t0 = time.time()
91+
92+
def choose(rand_index):
93+
# extract the <nd> rows
94+
rcand = cand_np[rand_index]
95+
dmat = compute_dmat(rcand, idx_np, idw_np, hist=hist_np)
96+
md, mdpts, mties = compute_min_params(dmat)
97+
98+
update_ = True
99+
t = 0
100+
101+
while update_ and (t < T):
102+
103+
(
104+
rcand_,
105+
md_,
106+
mdpts_,
107+
mties_,
108+
dmat_,
109+
added_,
110+
removed_,
111+
update_,
112+
) = update_min_dist(
113+
rcand,
114+
cand_np,
115+
ncand,
116+
idx_np,
117+
idw_np,
118+
md,
119+
mdpts,
120+
mties,
121+
dmat,
122+
hist=hist_np,
123+
rand_seed=rand_seed + rand_index if rand_seed is not None else None,
124+
)
125+
126+
if update_:
127+
rcand = rcand_
128+
md = md_
129+
mdpts = mdpts_
130+
mties = mties_
131+
dmat = dmat_
132+
if added_:
133+
rand_index[removed_] = added_
134+
135+
t += 1
136+
return {
137+
"index": rand_index,
138+
"cand": rcand,
139+
"md": md,
140+
"mdpts": mdpts,
141+
"mties": mties,
142+
"dmat": dmat,
143+
}
144+
145+
for x in db.from_sequence(rands).map(choose):
146+
if (x["md"] > best_md) or (
147+
(x["md"] == best_md) and (x["mties"] < best_mties)
148+
):
149+
best_index = x["index"]
150+
best_cand = x["cand"]
151+
best_md = x["md"]
152+
best_mdpts = x["mdpts"]
153+
best_mties = x["mties"]
154+
best_dmat = x["dmat"]
155+
print("Best minimum distance for this random start: {}".format(best_md))
156+
157+
elapsed_time = time.time() - t0
158+
159+
# no need to inverse-scale; can just use the indices to look up original rows in cand_
160+
best_cand_unscaled = cand_np_unscaled[best_index]
161+
162+
best_cand = pd.DataFrame(best_cand, index=best_index, columns=list(cand))
163+
best_cand_unscaled = pd.DataFrame(
164+
best_cand_unscaled, index=best_index, columns=list(cand)
165+
)
166+
167+
results = {
168+
"best_cand_scaled": best_cand,
169+
"best_cand": best_cand_unscaled,
170+
"best_index": best_index,
171+
"best_val": best_md,
172+
"best_mdpts": best_mdpts,
173+
"best_mties": best_mties,
174+
"best_dmat": best_dmat,
175+
"mode": mode,
176+
"design_size": nd,
177+
"num_restarts": nr,
178+
"mwr": mwr,
179+
"elapsed_time": elapsed_time,
180+
}
181+
182+
return results
183+
184+
rands, cand_nps, hist_nps = [], [], []
185+
for mwr in mwr_vals:
186+
cand_np = scale_y(scale_method, mwr, cand_np_, idw_np)
187+
188+
if hist is None:
189+
hist_np = None
190+
else:
191+
hist_np = cand_np[-nhist:]
192+
cand_np = cand_np[:ncand]
193+
194+
wts = cand_np[:, idw_np]
195+
wts_sum = np.sum(wts)
196+
prob = wts / wts_sum
197+
rand = []
198+
for i in range(nr):
199+
# sample without replacement <nd> indices
200+
rand.append(rand_gen.choice(ncand, nd, replace=False, p=prob))
201+
rands.append(rand)
202+
cand_nps.append(cand_np)
203+
hist_nps.append(hist_np)
204+
205+
results = db.from_sequence(zip(mwr_vals, rands, cand_nps, hist_nps)).map(step)
206+
return dict(zip(mwr_vals, results.compute()))

foqus_lib/framework/sdoe/sdoe.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@
1313
# "https://github.com/CCSI-Toolset/FOQUS".
1414
#################################################################################
1515
import configparser
16+
import logging
1617
import os
1718
import platform
1819
import re
1920
import tempfile
2021
import time
2122
from typing import Tuple, Dict
2223

24+
from dask.distributed import get_client
2325
import numpy as np
2426
import pandas as pd
2527

@@ -85,6 +87,17 @@ def run(config_file: str, nd: int, test: bool = False) -> Tuple[Dict, Dict, floa
8587

8688
sf_method = config["SF"]["sf_method"]
8789

90+
# check whether to use dask version of algorithms
91+
use_dask = False
92+
try:
93+
get_client()
94+
use_dask = True
95+
except ValueError:
96+
logging.getLogger("foqus." + __name__).exception(
97+
"Unable to load Dask client, continuing without it using original algorithms"
98+
)
99+
pass
100+
88101
if sf_method == "nusf":
89102
# 'Weight' column (should only be one)
90103
idw = [x for x, t in zip(include, types) if t == "Weight"]
@@ -111,7 +124,10 @@ def run(config_file: str, nd: int, test: bool = False) -> Tuple[Dict, Dict, floa
111124
"mwr_values": mwr_values,
112125
"scale_method": scale_method,
113126
}
114-
from .nusf import criterion
127+
if use_dask:
128+
from .nusf_dask import criterion
129+
else:
130+
from .nusf import criterion
115131

116132
if sf_method == "usf":
117133
scl = np.array([ub - lb for ub, lb in zip(max_vals, min_vals)])
@@ -120,7 +136,10 @@ def run(config_file: str, nd: int, test: bool = False) -> Tuple[Dict, Dict, floa
120136
"xcols": idx,
121137
"scale_factors": pd.Series(scl, index=include),
122138
}
123-
from .usf import criterion
139+
if use_dask:
140+
from .usf_dask import criterion
141+
else:
142+
from .usf import criterion
124143

125144
if sf_method == "irsf":
126145
args = {
@@ -160,8 +179,12 @@ def run(config_file: str, nd: int, test: bool = False) -> Tuple[Dict, Dict, floa
160179
return results["t1"], results["t2"]
161180
else:
162181
t0 = time.time()
163-
_results = criterion(cand, args, nr, nd, mode=mode, hist=hist)
182+
criterion(cand, args, nr, nd, mode=mode, hist=hist)
164183
elapsed_time = time.time() - t0
184+
if use_dask:
185+
return (
186+
elapsed_time - 1.4
187+
) # Dask startup skews the test results so remove that
165188
return elapsed_time
166189

167190
# otherwise, run sdoe for real

0 commit comments

Comments
 (0)