Skip to content

Commit e52ad0a

Browse files
committed
JIT: minor update
1 parent 87e661d commit e52ad0a

5 files changed

Lines changed: 39 additions & 26 deletions

File tree

ftio/api/gekkoFs/jit/execute_and_wait.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -470,10 +470,12 @@ def end_of_transfer_online(
470470
def get_files(settings: JitSettings, verbose=True):
471471
monitored_files = []
472472
files = ""
473+
# TODO: fix find for gekko
474+
command_ls = flaged_mpiexec_call(settings, f" ls -l {settings.gkfs_mntdir}")
475+
# command_ls = flaged_mpiexec_call(settings, f" find {settings.gkfs_mntdir}")
473476
try:
474-
# TODO: fix find for gekko
475-
command_ls = flaged_mpiexec_call(settings, f" find {settings.gkfs_mntdir}")
476-
files = subprocess.check_output(command_ls, shell=True).decode()
477+
# files = subprocess.check_output(command_ls, shell=True).decode()
478+
files = subprocess.check_output(command_ls, shell=True, text=True)
477479
if files:
478480
files = files.splitlines()
479481
files = [f for f in files if "LIBGKFS" not in f]
@@ -492,7 +494,7 @@ def get_files(settings: JitSettings, verbose=True):
492494
for f in monitored_files:
493495
console.print(f"[cyan]{f}[/]")
494496
except Exception as e:
495-
jit_print(f"[red] >> Error running test script:\n{e}")
497+
jit_print(f"[red] >> Error listing files script:\n{e}")
496498

497499
return monitored_files
498500

ftio/api/gekkoFs/jit/jitsettings.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -286,9 +286,10 @@ def set_variables(self) -> None:
286286
self.run_dir = "."
287287
self.app_flags = (
288288
f"workload=unet3d_my_a100 "
289+
# f"workload=bert_v100 "
289290
f"++workload.workflow.generate_data=True ++workload.workflow.train=True ++workload.workflow.checkpoint=True " #++workload.workflow.evaluation=True "
290-
f"++workload.dataset.data_folder={self.run_dir}/data/unet3d ++workload.checkpoint.checkpoint_folder={self.run_dir}/checkpoints/unet3d "
291-
f"++workload.output.output_folder={self.run_dir}/hydra_log/unet3d "
291+
f"++workload.dataset.data_folder={self.run_dir}/data/jit ++workload.checkpoint.checkpoint_folder={self.run_dir}/checkpoints/jit "
292+
f"++workload.output.output_folder={self.run_dir}/hydra_log/jit "
292293
)
293294
## ├─ LAMMPS -->
294295
# self.app_call = "/lustre/project/nhr-admire/shared/mylammps/build/lmp"
@@ -315,13 +316,14 @@ def set_variables(self) -> None:
315316
self.pre_app_call = ""
316317
self.post_app_call = ""
317318
else:
318-
self.run_dir = self.gkfs_mntdir
319+
# self.run_dir = self.gkfs_mntdir #? don't enable this flag, as the executing node doesn't have this folder
319320
self.app_flags = (
320321
f"workload=unet3d_my_a100_gekko "
322+
# f"workload=bert_v100 "
321323
f"++workload.workflow.generate_data=True ++workload.workflow.train=True ++workload.workflow.checkpoint=True " #++workload.workflow.evaluation=True "
322-
f"++workload.dataset.data_folder={self.run_dir}/data/unet3d ++workload.checkpoint.checkpoint_folder={self.run_dir}/checkpoints/unet3d "
323-
f"++workload.output.output_folder={self.run_dir}/hydra_log/unet3d "
324-
# ++workload.output.log_file={self.run_dir}/hydra_log/unet3d"# ++workload.dataset.num_files_train=16"
324+
f"++workload.dataset.data_folder={self.gkfs_mntdir}/data/jit ++workload.checkpoint.checkpoint_folder={self.gkfs_mntdir}/checkpoints/jit "
325+
f"++workload.output.output_folder={self.gkfs_mntdir}/hydra_log/jit "
326+
# ++workload.output.log_file={self.gkfs_mntdir}/hydra_log/unet3d"# ++workload.dataset.num_files_train=16"
325327
)
326328
# self.app_flags = "workload=unet3d_my_a100_gekko"
327329
# self.pre_app_call = f"mpirun -np 10 dlio_benchmark {self.app_flags} ++workload.workflow.generate_data=True ++workload.workflow.train=False"

ftio/api/gekkoFs/jit/setup_core.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ def stage_out(settings: JitSettings, runtime: JitTime) -> None:
350350
# additional_arguments = load_flags(settings)
351351
# call = f"{additional_arguments} cp -r {settings.gkfs_mntdir}/* {settings.stage_out_path} "
352352
call = flaged_mpiexec_call(
353-
settings, f"cp -r {settings.gkfs_mntdir} {settings.stage_out_path}"
353+
settings, f"cp -r {settings.gkfs_mntdir} {settings.stage_out_path} || echo 'nothing to stage in'"
354354
)
355355
start = time.time()
356356
_ = execute_block(call, dry_run=settings.dry_run)
@@ -446,8 +446,8 @@ def start_application(settings: JitSettings, runtime: JitTime):
446446

447447
# f" cd {settings.run_dir} && "
448448
# f"strace -f -e trace=read,write,open,close,stat,fstat,lseek,access -o /gpfs/fs1/home/tarrafah/strace_n{settings.app_nodes}_p{settings.procs_app}.txt mpiexec -np {settings.app_nodes*settings.procs_app} --oversubscribe "
449-
f" cd {settings.run_dir} && time mpiexec --mca errhandler ftmpi --mca mpi_abort_print_stack 1 -np {settings.app_nodes*settings.procs_app} --oversubscribe "
450-
# f" cd {settings.run_dir} && time mpiexec -np {settings.app_nodes*settings.procs_app} --oversubscribe "
449+
# f" cd {settings.run_dir} && time -p mpiexec --mca errhandler ftmpi --mca mpi_abort_print_stack 1 -np {settings.app_nodes*settings.procs_app} --oversubscribe "
450+
f" cd {settings.run_dir} && time -p mpiexec -np {settings.app_nodes*settings.procs_app} --oversubscribe "
451451
f"--hostfile {settings.dir}/hostfile_mpi --map-by node "
452452
f"{additional_arguments} "
453453
f"{settings.task_set_1} {settings.app_call} {settings.app_flags}"

ftio/api/gekkoFs/jit/setup_helper.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -934,9 +934,10 @@ def log_dir(settings: JitSettings):
934934
settings.log_dir += f"_{settings.log_suffix}"
935935

936936
counter = 0
937+
name = settings.log_dir
937938
while os.path.exists(settings.log_dir):
938939
counter += 1
939-
settings.log_dir = f"{settings.log_dir}_{counter}"
940+
settings.log_dir = f"{name}_{counter}"
940941

941942
# Resolve and return the absolute path of LOG_DIR
942943
settings.log_dir = os.path.abspath(settings.log_dir)
@@ -986,12 +987,12 @@ def get_address_cargo(settings: JitSettings) -> None:
986987
settings.cargo_server = f"ofi+tcp://{settings.address_cargo}:62000"
987988

988989
jit_print(f">> Address CARGO: {settings.address_cargo}")
989-
jit_print(f">> CARGO server: {settings.cargo_server} ", True)
990+
jit_print(f">> CARGO server: {settings.cargo_server} \n")
990991

991992

992993
def set_dir_gekko(settings: JitSettings) -> None:
993994
if settings.node_local and settings.cluster:
994-
jit_print(">> Setting Gekko root dir to node local")
995+
jit_print(">> Node local flag set")
995996
# old_gkfs_rootdir = settings.gkfs_rootdir
996997
old_gkfs_mntdir = settings.gkfs_mntdir
997998

@@ -1001,14 +1002,20 @@ def set_dir_gekko(settings: JitSettings) -> None:
10011002
settings.gkfs_mntdir = (
10021003
f"/localscratch/{settings.job_id}/{os.path.basename(settings.gkfs_mntdir)}"
10031004
)
1004-
jit_print(f">> Gekko root dir set to: {settings.gkfs_rootdir}")
1005-
jit_print(f">> Gekko mnt dir set to: {settings.gkfs_mntdir}")
1005+
jit_print(f">> |-> Gekko root dir updated to: {settings.gkfs_rootdir}")
1006+
jit_print(f">> |-> Gekko mnt dir updated to: {settings.gkfs_mntdir}")
10061007

10071008
if old_gkfs_mntdir in settings.run_dir:
10081009
settings.run_dir = settings.run_dir.replace(old_gkfs_mntdir, settings.gkfs_mntdir)
10091010
jit_print(
1010-
f">> Run dir set to: {settings.gkfs_rootdir}",
1011+
f">> |-> Run dir updated to: {settings.run_dir}",
10111012
)
1013+
if old_gkfs_mntdir in settings.app_flags:
1014+
settings.app_flags = settings.app_flags.replace(old_gkfs_mntdir, settings.gkfs_mntdir)
1015+
jit_print(
1016+
f">> |-> App flags updated to: {settings.app_flags}",
1017+
)
1018+
10121019

10131020

10141021
def print_settings(settings) -> None:

ftio/api/gekkoFs/jit/setup_init.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,26 @@ def init_gekko(settings: JitSettings) -> None:
1010
"""
1111
if not settings.exclude_daemon:
1212
create_hostfile(settings)
13+
calls =[]
1314
# set debug flag
1415
if settings.cluster:
1516
# Create directories
16-
# call_0 = f"srun --jobid={settings.job_id} {settings.single_node_command} -N 1 --ntasks=1 mkdir -p {settings.gkfs_mntdir}"
17-
call_0 =(
17+
# calls_0 = f"srun --jobid={settings.job_id} {settings.single_node_command} -N 1 --ntasks=1 mkdir -p {settings.gkfs_mntdir}"
18+
calls.append(
1819
f"srun --jobid={settings.job_id} {settings.app_nodes_command} --disable-status -N {settings.app_nodes} "
1920
f"--ntasks={settings.app_nodes} --cpus-per-task={settings.procs_daemon} --ntasks-per-node=1 --overcommit --overlap "
2021
f"--oversubscribe --mem=0 mkdir -p {settings.gkfs_mntdir}"
2122
)
22-
call_1 =(
23+
calls.append(
2324
f"srun --jobid={settings.job_id} {settings.app_nodes_command} --disable-status -N {settings.app_nodes} "
2425
f"--ntasks={settings.app_nodes} --cpus-per-task={settings.procs_daemon} --ntasks-per-node=1 --overcommit --overlap "
2526
f"--oversubscribe --mem=0 mkdir -p {settings.gkfs_rootdir}"
2627
)
2728
else:
28-
call_0 = f"mkdir -p {settings.gkfs_mntdir}"
29-
call_1 = f"mkdir -p {settings.gkfs_rootdir}"
29+
calls.append(f"mkdir -p {settings.gkfs_mntdir}")
30+
calls.append(f"mkdir -p {settings.gkfs_rootdir}")
3031

3132
jit_print("[cyan]>> Creating directories[/]")
32-
_ = execute_block(call_0)
33-
_ = execute_block(call_1)
33+
for call in calls:
34+
_ = execute_block(call)
35+

0 commit comments

Comments
 (0)