Skip to content

Commit 7e02a3c

Browse files
committed
JIT: added srun option
1 parent e52ad0a commit 7e02a3c

2 files changed

Lines changed: 124 additions & 62 deletions

File tree

ftio/api/gekkoFs/jit/setup_core.py

Lines changed: 93 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
elapsed_time,
1313
flaged_mpiexec_call,
1414
get_env,
15+
get_executable_realpath,
1516
handle_sigint,
1617
jit_print,
1718
mpiexec_call,
@@ -350,7 +351,8 @@ def stage_out(settings: JitSettings, runtime: JitTime) -> None:
350351
# additional_arguments = load_flags(settings)
351352
# call = f"{additional_arguments} cp -r {settings.gkfs_mntdir}/* {settings.stage_out_path} "
352353
call = flaged_mpiexec_call(
353-
settings, f"cp -r {settings.gkfs_mntdir} {settings.stage_out_path} || echo 'nothing to stage in'"
354+
settings,
355+
f"cp -r {settings.gkfs_mntdir} {settings.stage_out_path} || echo 'nothing to stage in'",
354356
)
355357
start = time.time()
356358
_ = execute_block(call, dry_run=settings.dry_run)
@@ -364,7 +366,9 @@ def stage_out(settings: JitSettings, runtime: JitTime) -> None:
364366

365367
if not settings.dry_run:
366368
try:
367-
call = flaged_mpiexec_call(settings, f"ls -R {settings.gkfs_mntdir}")
369+
call = flaged_mpiexec_call(
370+
settings, f"ls -R {settings.gkfs_mntdir}"
371+
)
368372
files = subprocess.check_output(call, shell=True).decode()
369373
console.print(
370374
f"[cyan]>> gekko_ls {settings.gkfs_mntdir}: \n{files}[/]"
@@ -406,75 +410,78 @@ def stage_out(settings: JitSettings, runtime: JitTime) -> None:
406410
#! App call
407411
#!################
408412
def start_application(settings: JitSettings, runtime: JitTime):
409-
name = settings.app_call.split("/", 1)[1] if "/" in settings.app_call else settings.app_call
413+
name = (
414+
settings.app_call.split("/", 1)[1]
415+
if "/" in settings.app_call
416+
else settings.app_call
417+
)
410418
console.print(
411419
f"[green bold]####### Executing Application {name} [/][black][{get_time()}][/]"
412420
)
413421
# set up dir
414422
original_dir = settings.dir
415423
jit_print(f">> Current directory {original_dir}")
416424
if not settings.dry_run:
417-
# check_setup(settings)
418-
pass
419-
425+
check_setup(settings)
426+
# pass
420427

421-
# s_call=""
422428
if settings.cluster:
423-
# without FTIO
424-
# ? [--stag in (si)--] [--stag out (so)--]
425-
# ? [---APP---]
426-
# with FTIO
427-
# ? [--stag in--] [so] [so] ... [so]
428-
# ? [---APP---]
429-
430429
additional_arguments = ""
431-
if not settings.exclude_ftio:
432-
additional_arguments += f"-x LIBGKFS_METRICS_IP_PORT={settings.address_ftio}:{settings.port} -x LIBGKFS_ENABLE_METRICS=on "
433-
if not settings.exclude_proxy:
434-
additional_arguments += (
435-
f"-x LIBGKFS_PROXY_PID_FILE={settings.gkfs_proxyfile} "
436-
)
437-
if not settings.exclude_daemon:
438-
additional_arguments += (
439-
f"-x LIBGKFS_LOG=info,warnings,errors "
440-
f"-x LIBGKFS_LOG_OUTPUT={settings.gekko_client_log} "
441-
f"-x LIBGKFS_HOSTS_FILE={settings.gkfs_hostfile} "
442-
f"-x LD_PRELOAD={settings.gkfs_intercept} "
443-
)
430+
if settings.use_mpirun:
431+
if not settings.exclude_ftio:
432+
additional_arguments += f"-x LIBGKFS_METRICS_IP_PORT={settings.address_ftio}:{settings.port} -x LIBGKFS_ENABLE_METRICS=on "
433+
if not settings.exclude_proxy:
434+
additional_arguments += (
435+
f"-x LIBGKFS_PROXY_PID_FILE={settings.gkfs_proxyfile} "
436+
)
437+
if not settings.exclude_daemon:
438+
additional_arguments += (
439+
f"-x LIBGKFS_LOG=info,warnings,errors "
440+
f"-x LIBGKFS_LOG_OUTPUT={settings.gekko_client_log} "
441+
f"-x LIBGKFS_HOSTS_FILE={settings.gkfs_hostfile} "
442+
f"-x LD_PRELOAD={settings.gkfs_intercept} "
443+
)
444444

445-
call = (
446-
447-
# f" cd {settings.run_dir} && "
448-
# f"strace -f -e trace=read,write,open,close,stat,fstat,lseek,access -o /gpfs/fs1/home/tarrafah/strace_n{settings.app_nodes}_p{settings.procs_app}.txt mpiexec -np {settings.app_nodes*settings.procs_app} --oversubscribe "
449-
# f" cd {settings.run_dir} && time -p mpiexec --mca errhandler ftmpi --mca mpi_abort_print_stack 1 -np {settings.app_nodes*settings.procs_app} --oversubscribe "
450-
f" cd {settings.run_dir} && time -p mpiexec -np {settings.app_nodes*settings.procs_app} --oversubscribe "
451-
f"--hostfile {settings.dir}/hostfile_mpi --map-by node "
452-
f"{additional_arguments} "
453-
f"{settings.task_set_1} {settings.app_call} {settings.app_flags}"
454-
)
445+
call = (
446+
# f" cd {settings.run_dir} && "
447+
# f"strace -f -e trace=read,write,open,close,stat,fstat,lseek,access -o /gpfs/fs1/home/tarrafah/strace_n{settings.app_nodes}_p{settings.procs_app}.txt mpiexec -np {settings.app_nodes*settings.procs_app} --oversubscribe "
448+
# f" cd {settings.run_dir} && time -p mpiexec --mca errhandler ftmpi --mca mpi_abort_print_stack 1 -np {settings.app_nodes*settings.procs_app} --oversubscribe "
449+
f" cd {settings.run_dir} && time -p mpiexec -np {settings.app_nodes*settings.procs_app} --oversubscribe "
450+
f"--hostfile {settings.dir}/hostfile_mpi --map-by node "
451+
f"{additional_arguments} "
452+
f"{settings.task_set_1} {settings.app_call} {settings.app_flags}"
453+
)
454+
else:
455+
if not settings.exclude_ftio:
456+
additional_arguments += f"LIBGKFS_ENABLE_METRICS=on,LIBGKFS_METRICS_IP_PORT={settings.address_ftio}:{settings.port},"
457+
if not settings.exclude_proxy:
458+
additional_arguments += (
459+
f"LIBGKFS_PROXY_PID_FILE={settings.gkfs_proxyfile},"
460+
)
461+
if not settings.exclude_daemon:
462+
additional_arguments += (
463+
f"LIBGKFS_LOG=info,warnings,errors,"
464+
f"LIBGKFS_LOG_OUTPUT={settings.gekko_client_log},"
465+
f"LIBGKFS_HOSTS_FILE={settings.gkfs_hostfile},"
466+
f"LD_PRELOAD={settings.gkfs_intercept},"
467+
)
455468

456-
# s_call =(
457-
# f" srun "
458-
# f"--export="
459-
# f"LIBGKFS_LOG=info,warnings,errors,"
460-
# f"LIBGKFS_ENABLE_METRICS=on,"
461-
# f"LIBGKFS_METRICS_IP_PORT={settings.address_ftio}:{settings.port},"
462-
# f"LD_PRELOAD={settings.gkfs_intercept},"
463-
# f"LIBGKFS_HOSTS_FILE={settings.gkfs_hostfile},"
464-
# f"LIBGKFS_PROXY_PID_FILE={settings.gkfs_proxyfile},"
465-
# f"LIBGKFS_LOG_OUTPUT={settings.gekko_client_log},"
466-
# f"LD_LIBRARY_PATH={os.environ.get('LD_LIBRARY_PATH')} "
467-
# f"--jobid={settings.job_id} "
468-
# f"{settings.app_nodes_command} --disable-status -N {settings.app_nodes} "
469-
# f"--ntasks={settings.app_nodes*settings.procs_app} --cpus-per-task={settings.procs_app} --ntasks-per-node={settings.procs_app} "
470-
# f"--overcommit --overlap --oversubscribe --mem=0 "
471-
# f"{settings.run_dir}/{settings.app_call} {settings.app_flags}")
469+
app_call = get_executable_realpath(settings.app_call,settings.run_dir)
470+
call = (
471+
f" cd {settings.run_dir} && time -p srun "
472+
f"--export=ALL,{additional_arguments}LD_LIBRARY_PATH={os.environ.get('LD_LIBRARY_PATH')} "
473+
f"--jobid={settings.job_id} {settings.app_nodes_command} --disable-status "
474+
f"-N {settings.app_nodes} --ntasks={settings.app_nodes*settings.procs_app} "
475+
f"--cpus-per-task={settings.procs_app} --ntasks-per-node={settings.procs_app} "
476+
f"--overcommit --overlap --oversubscribe --mem=0 "
477+
f"{app_call} {settings.app_flags}"
478+
)
472479
else:
473480
# Define the call for non-cluster environment
474481
if settings.run_dir:
475482
os.chdir(settings.run_dir)
476483
jit_print(f">> Changing directory to {os.getcwd()}")
477-
484+
478485
if settings.exclude_all:
479486
call = f" time mpiexec -np {settings.procs_app} --oversubscribe {settings.app_call} {settings.app_flags}"
480487
elif settings.exclude_ftio:
@@ -549,14 +556,26 @@ def pre_call(settings: JitSettings) -> None:
549556
# additional_arguments = load_flags(settings)
550557
if isinstance(settings.pre_app_call, str):
551558
call = settings.pre_app_call
552-
if any(x in call for x in ['mpiex','mpirun']):
553-
call = flaged_mpiexec_call(settings,call)
554-
execute_block_and_monitor(settings.verbose, call, settings.app_log, settings.app_err,settings.dry_run)
559+
if any(x in call for x in ["mpiex", "mpirun"]):
560+
call = flaged_mpiexec_call(settings, call)
561+
execute_block_and_monitor(
562+
settings.verbose,
563+
call,
564+
settings.app_log,
565+
settings.app_err,
566+
settings.dry_run,
567+
)
555568
elif isinstance(settings.pre_app_call, list):
556569
for call in settings.pre_app_call:
557-
if any(x in call for x in ['mpiex','mpirun']):
558-
call = flaged_mpiexec_call(settings,call)
559-
execute_block_and_monitor(settings.verbose, call, settings.app_log, settings.app_err,settings.dry_run)
570+
if any(x in call for x in ["mpiex", "mpirun"]):
571+
call = flaged_mpiexec_call(settings, call)
572+
execute_block_and_monitor(
573+
settings.verbose,
574+
call,
575+
settings.app_log,
576+
settings.app_err,
577+
settings.dry_run,
578+
)
560579
# _ = execute_block_and_log(
561580
# settings.pre_app_call, settings.app_log
562581
# )
@@ -572,9 +591,21 @@ def post_call(settings: JitSettings) -> None:
572591
additional_arguments = "" # load_flags(settings)
573592
if isinstance(settings.post_app_call, str):
574593
call = f"{additional_arguments} {settings.post_app_call}"
575-
execute_block_and_monitor(settings.verbose, call, settings.app_log, settings.app_err,settings.dry_run)
594+
execute_block_and_monitor(
595+
settings.verbose,
596+
call,
597+
settings.app_log,
598+
settings.app_err,
599+
settings.dry_run,
600+
)
576601
elif isinstance(settings.post_app_call, list):
577602
call = ""
578603
for s in settings.post_app_call:
579604
call = f"{additional_arguments} {s}"
580-
execute_block_and_monitor(settings.verbose, call, settings.app_log, settings.app_err,settings.dry_run)
605+
execute_block_and_monitor(
606+
settings.verbose,
607+
call,
608+
settings.app_log,
609+
settings.app_err,
610+
settings.dry_run,
611+
)

ftio/api/gekkoFs/jit/setup_helper.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import getopt
44
import os
55
import signal
6+
import shutil
67
from rich.console import Console
78
from ftio.api.gekkoFs.jit.jitsettings import JitSettings
89
from ftio.api.gekkoFs.jit.jittime import JitTime
@@ -1314,6 +1315,36 @@ def clean_call(call:str,procs:int):
13141315
return call, procs
13151316

13161317

1318+
def get_executable_realpath(executable_name, search_location=None):
1319+
"""
1320+
Try to find the real path of an executable.
1321+
Parameters:
1322+
- executable_name (str): The name of the executable.
1323+
1324+
Returns:
1325+
- str: Real path of the executable or its name if not found.
1326+
"""
1327+
if search_location:
1328+
potential_path = os.path.join(search_location, executable_name)
1329+
if os.path.isfile(potential_path) and os.access(potential_path, os.X_OK):
1330+
try:
1331+
return os.path.realpath(potential_path)
1332+
except Exception as e:
1333+
print(f"Warning: Could not resolve real path for {potential_path}: {e}")
1334+
return executable_name
1335+
1336+
# Fall back to searching in the system PATH
1337+
executable_path = shutil.which(executable_name)
1338+
if executable_path:
1339+
try:
1340+
return os.path.realpath(executable_path)
1341+
except Exception as e:
1342+
print(f"Warning: Could not resolve real path for {executable_name}: {e}")
1343+
1344+
# Fallback: return the name if not found
1345+
jit_print(f">> Application: {executable_name}")
1346+
return executable_name
1347+
13171348
def update_hostfile_mpi(settings: JitSettings) -> None:
13181349
# Command to get the list of hostnames for the job
13191350
squeue_command = f"squeue -j {settings.job_id} -o '%N' | tail -n +2"

0 commit comments

Comments
 (0)