diff --git a/examples-proposed/021-ensembles-from-CSV/driver.py b/examples-proposed/021-ensembles-from-CSV/driver.py index e6de4454..f62b7f89 100644 --- a/examples-proposed/021-ensembles-from-CSV/driver.py +++ b/examples-proposed/021-ensembles-from-CSV/driver.py @@ -40,12 +40,16 @@ def step(self, timestamp=0.0): # this information to find the specific instance run directory for a # given set of variables. E.g., the instance corresponding to # {'A' : 2, 'B' : 5.82, 'C' : 'baz'} is probably found in the - # `INSTANCE_1` subdirectory. + # `INSTANCE_1` subdirectory. We also direct the stdout and stderr + # for each instance to `my_logfile.txt` and `my_errfile.txt`, + # respectively. mapping = self.services.run_ensemble(template, variables, run_dir=Path('.').absolute(), name='INSTANCE_', num_nodes=1, - cores_per_instance=1) + cores_per_instance=1, + logfile='my_logfile.txt', + errfile='my_errfile.txt') # Print each mapping of instance name to what variable values were used. for instance in mapping: self.services.info(f'{instance!s}') \ No newline at end of file diff --git a/ipsframework/services.py b/ipsframework/services.py index 525f7805..35aa8420 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -104,8 +104,7 @@ def launch(executable: Any, f'worker {worker.name!s} in {working_dir}') start_time = time.time() - original_directory = os.getcwd() - os.chdir(working_dir) + working_dir_path = Path(working_dir) ret_val = None if isinstance(executable, str): @@ -114,29 +113,39 @@ def launch(executable: Any, # Do we write the Popen stdout to sys.stdout or to a file? subprocess_stdout = sys.stdout + close_stdout = False # is true if we need to later close the file try: log_filename = kwargs['logfile'] except KeyError: log.info('No logfile specified, using stdout for task output') else: - subprocess_stdout = open(log_filename, 'w') - log.info(f'Task output log file: {log_filename}') + log_path = Path(log_filename) + if not log_path.is_absolute(): + log_path = working_dir_path / log_path + subprocess_stdout = open(log_path, 'w') + close_stdout = True # Welp, gotta close it now + log.info(f'Task output log file: {log_path}') # Repeat the same for stderr - task_stderr = subprocess.STDOUT + subprocess_errfile = subprocess.STDOUT + close_stderr = False try: - subprocess_stderr = kwargs['errfile'] + subprocess_errfile = kwargs['errfile'] except KeyError: log.info('No errfile specified, using STDOUT for task errors') else: + err_path = Path(subprocess_errfile) + if not err_path.is_absolute(): + err_path = working_dir_path / err_path try: - task_stderr = open(subprocess_stderr, 'w') + subprocess_errfile = open(err_path, 'w') except OSError: - log.info(f'Could not open errfile {subprocess_stderr}, ' + log.info(f'Could not open errfile {err_path}, ' f'using STDOUT for task errors') - task_stderr = subprocess.STDOUT + subprocess_errfile = subprocess.STDOUT else: - log.info(f'Task error log file: {subprocess_stderr}') + close_stderr = True + log.info(f'Task error log file: {err_path}') task_env = kwargs.get('task_env', {}) new_env = os.environ.copy() @@ -191,69 +200,76 @@ def launch(executable: Any, }) cmd_lst = cmd.split() + process = None try: - process = subprocess.Popen(cmd_lst, stdout=subprocess_stdout, - stderr=task_stderr, - cwd=working_dir, - preexec_fn=os.setsid, env=new_env) # noqa: PLW1509 (TODO: look into this to potentially avoid deadlocks) - except Exception as e: - worker.log_event('ips', - { - 'eventType' : 'IPS_DASK_TASK_END', - 'event_time': time.time(), - 'state' : 'Failed', - 'comment' : f'task_name = {task_name} ' - f'Exception when calling ' - f'{executable!s}: {e!s}', - 'operation' : ' '.join(map(str, args)), - }) - log.error(f'Failed to launch task {task_name} with ' - f'command {cmd}: {e}') - raise - finally: - os.chdir(original_directory) + try: + process = subprocess.Popen(cmd_lst, + stdout=subprocess_stdout, + stderr=subprocess_errfile, + cwd=working_dir_path, + preexec_fn=os.setsid, env=new_env) # noqa: PLW1509 (TODO: look into this to potentially avoid deadlocks) + except Exception as e: + worker.log_event('ips', + { + 'eventType' : 'IPS_DASK_TASK_END', + 'event_time': time.time(), + 'state' : 'Failed', + 'comment' : f'task_name = {task_name} ' + f'Exception when calling ' + f'{executable!s}: {e!s}', + 'operation' : ' '.join(map(str, args)), + }) + log.error(f'Failed to launch task {task_name} with ' + f'command {cmd}: {e}') + raise - try: - ret_val = process.wait(timeout) - finish_time = time.time() - worker.log_event('ips', - { - 'eventType' : 'IPS_DASK_TASK_END', - 'event_time' : finish_time, - 'state' : 'Succeeded', - 'comment' : f'task_name = ' - f'{task_name},' - f' elapsed time = ' - f'{finish_time - start_time:.2f}s', - 'start_time' : start_time, - 'elapsed_time': finish_time - start_time, - 'target' : executable, - 'operation' : ' '.join(map(str, args)), - }) + try: + ret_val = process.wait(timeout) + finish_time = time.time() + worker.log_event('ips', + { + 'eventType' : 'IPS_DASK_TASK_END', + 'event_time' : finish_time, + 'state' : 'Succeeded', + 'comment' : f'task_name = ' + f'{task_name},' + f' elapsed time = ' + f'{finish_time - start_time:.2f}s', + 'start_time' : start_time, + 'elapsed_time': finish_time - start_time, + 'target' : executable, + 'operation' : ' '.join(map(str, args)), + }) - except subprocess.TimeoutExpired: - worker.log_event('ips', - { - 'eventType' : 'IPS_DASK_TASK_END', - 'event_time': time.time(), - 'state' : 'Timed out', - 'comment' : f'task_name = {task_name}, ' - f'timed-out after ' - f'{timeout}s'}) - process.kill() - log.error(f'Task {task_name} with command {cmd} timed out ' - f'after {timeout}s') - ret_val = -1 - except Exception as e: - worker.log_event('ips', - { - 'eventType' : 'IPS_DASK_TASK_END', - 'event_time': time.time(), - 'state' : 'Failed', - 'comment' : f'task_name = {task_name} ' - f'Exception when calling ' - f'{executable!s}: {e!s}'}) - log.error(f'Task {task_name} with command {cmd} failed with {e!s}') + except subprocess.TimeoutExpired: + worker.log_event('ips', + { + 'eventType' : 'IPS_DASK_TASK_END', + 'event_time': time.time(), + 'state' : 'Timed out', + 'comment' : f'task_name = {task_name}, ' + f'timed-out after ' + f'{timeout}s'}) + process.kill() + log.error(f'Task {task_name} with command {cmd} timed out ' + f'after {timeout}s') + ret_val = -1 + except Exception as e: + worker.log_event('ips', + { + 'eventType' : 'IPS_DASK_TASK_END', + 'event_time': time.time(), + 'state' : 'Failed', + 'comment' : f'task_name = {task_name} ' + f'Exception when calling ' + f'{executable!s}: {e!s}'}) + log.error(f'Task {task_name} with command {cmd} failed with {e!s}') + finally: + if close_stdout: + subprocess_stdout.close() + + if close_stderr: + subprocess_errfile.close() elif isinstance(executable, Callable): # binary not a string, but is a python callable, so we call it directly # with the given *args @@ -268,6 +284,9 @@ def launch(executable: Any, }) try: + original_dir = Path.cwd() + + os.chdir(str(working_dir_path)) ret_val = executable(*args) finish_time = time.time() @@ -297,14 +316,14 @@ def launch(executable: Any, f'{executable!s}: {e!s}'}) log.error(f'Task {task_name} with callable {executable!s} failed ' f'with {e!s}') + finally: + os.chdir(str(original_dir)) else: - os.chdir(original_directory) raise RuntimeError(f'Binary argument {executable!s} is not a string or ' f'callable, cannot launch task {task_name}') log.info(f'Task {task_name} finished with return value: {ret_val}') - os.chdir(original_directory) return task_name, ret_val @@ -2279,15 +2298,40 @@ def submit_tasks( submitted. Optionally, dask can be used to schedule and run the task pool. + + :param task_pool_name: name of task pool to submit + :param block: if True, return when all tasks have been launched. + If False, return when all tasks that can be launched immediately have + been launched. + :param use_dask: if True, use dask to schedule and run the task pool + :param dask_nodes: if using dask, number of nodes to use + :param dask_ppw: if using dask, number of processes per worker to use; + if None, use number of cores per node + :param launch_interval: number of seconds to wait between launching tasks + :param use_shifter: if True, use shifter to run tasks in a container + :param shifter_args: args for running under shifter + :param dask_worker_plugin: plugin class for Dask workers + :param dask_worker_per_gpu: how many Dask workers per GPU? + :param oversubscribe: if True, oversubscribe available resources + :param hwthreads: if True, use hardware threads as the basis for + resource allocation; if False, use physical cores as the basis for + resource allocation + :returns: task return value """ start_time = time.time() - self._send_monitor_event('IPS_TASK_POOL_BEGIN', 'task_pool = %s ' % task_pool_name) + self._send_monitor_event('IPS_TASK_POOL_BEGIN', + 'task_pool = %s ' % task_pool_name) task_pool: TaskPool = self.task_pools[task_pool_name] retval = task_pool.submit_tasks( - block, use_dask, dask_nodes, dask_ppw, launch_interval, use_shifter, shifter_args, dask_worker_plugin, dask_worker_per_gpu, oversubscribe, hwthreads + block, use_dask, dask_nodes, dask_ppw, launch_interval, + use_shifter, shifter_args, dask_worker_plugin, + dask_worker_per_gpu, oversubscribe, hwthreads ) elapsed_time = time.time() - start_time - self._send_monitor_event('IPS_TASK_POOL_END', 'task_pool = %s elapsed time = %.2f S' % (task_pool_name, elapsed_time), elapsed_time=elapsed_time) + self._send_monitor_event('IPS_TASK_POOL_END', + 'task_pool = %s elapsed time = %.2f S' % ( + task_pool_name, elapsed_time), + elapsed_time=elapsed_time) return retval def get_finished_tasks(self, task_pool_name: str): @@ -2429,6 +2473,8 @@ def run_ensemble( cores_per_instance: Optional[int] = None, oversubscribe: bool = False, hwthreads: bool = False, + logfile: Union[str, os.PathLike] = None, + errfile: Union[str, os.PathLike] = None, ): """Run ensemble of simulations given the template and variables. @@ -2456,6 +2502,14 @@ def run_ensemble( config file created from `template` with `?` variables replaced with the values from `variables`. + .. seealso:: + :py:func:`ipsutil.params_from_csv()` for a convenient way to generate + the `variables` dict from a csv file.) + + .. note:: + `logfile` and `errfile` will write to the working directory local + to the ensemble instance. + TODO be able to specify the number of cores per instance :param template: configuration template file @@ -2470,6 +2524,8 @@ def run_ensemble( :param oversubscribe: Whether to allow oversubscription of nodes when launching the ensemble runs. Default is False. :param hwthreads: Whether to use hardware threads + :param logfile: Optional file name in which to write stdout + :param errfile: Optional file name in which to write stderr :returns: a list of dicts mapping created subdirs to simulation names and their parameters """ @@ -2737,13 +2793,20 @@ def send_ensemble_instance_to_portal(ensemble_name: str, data_path: Path) -> Non # IPS run pointed to that config file. args = [f'--simulation={simulation_filename}', f'--log={log_file}', f'--platform={platform_filename!s}'] + kwargs = {} # optionally add logfile and errfile + if logfile: + kwargs['logfile'] = logfile + if errfile: + kwargs['errfile'] = errfile + + if self.fwk.logger.getEffectiveLevel() == logging.DEBUG: # If we're in debug mode, then also pass the debug flag. # May as well pass in the --verbose, too. args.insert(1, '--debug') args.insert(1, '--verbose') - self.add_task(task_pool_name, instance[0], 1, working_dir, 'ips.py', *args) + self.add_task(task_pool_name, instance[0], 1, working_dir, 'ips.py', *args, **kwargs) try: # Note that we *always* use Dask to run the ensemble tasks @@ -3014,7 +3077,7 @@ def submit_dask_tasks( dask_worker_plugin=None, dask_worker_per_gpu=False, oversubscribe=False, - hwthreads=False, + hwthreads=False ): """Launch tasks in *queued_tasks* using dask. @@ -3310,7 +3373,7 @@ def submit_tasks( dask_worker_plugin=None, dask_worker_per_gpu=False, oversubscribe=False, - hwthreads=False, + hwthreads=False ): """Launch tasks in *queued_tasks*. Finished tasks are handled before launching new ones. If *block* is ``True``, the number of @@ -3349,22 +3412,29 @@ def submit_tasks( :param hwthreads: If True then use hardware threads when launching tasks. Default is False. :type hwthreads: bool + :returns: """ - if use_dask: if TaskPool.dask and TaskPool.distributed and self.serial_pool: self.dask_pool = True if use_shifter and not self.shifter: - self.services.error('Requested to run dask within shifter but shifter not available') + self.services.error('Requested to run dask within shifter ' + 'but shifter not available') raise RuntimeError('shifter not found') else: return self.submit_dask_tasks( - block, dask_nodes, dask_ppw, use_shifter, shifter_args, dask_worker_plugin, dask_worker_per_gpu, oversubscribe, hwthreads + block, dask_nodes, dask_ppw, use_shifter, + shifter_args, dask_worker_plugin, + dask_worker_per_gpu, oversubscribe, hwthreads ) elif not TaskPool.dask or not TaskPool.distributed: - raise RuntimeError('Requested use_dask but cannot because import dask or distributed failed') + raise RuntimeError( + 'Requested use_dask but cannot because import dask or ' + 'distributed failed') elif not self.serial_pool: - self.services.warning('Requested use_dask but cannot because multiple processors requested') + self.services.warning( + 'Requested use_dask but cannot because multiple ' + 'processors requested') submit_count = 0 # Make sure any finished tasks are handled before attempting to submit @@ -3373,7 +3443,8 @@ def submit_tasks( while True: if len(self.queued_tasks) == 0: break - active_tasks = self.services.launch_task_pool(self.name, launch_interval) + active_tasks = self.services.launch_task_pool(self.name, + launch_interval) for task_name, task_id in active_tasks.items(): self.active_tasks[task_id] = self.queued_tasks.pop(task_name) submit_count += 1 @@ -3458,6 +3529,8 @@ def get_dask_finished_tasks_status(self): :return: dict mapping task name to exit status :rtype: dict """ + result = None + if self.dask_client is None: # FIXME How does this happen and is it ok when it does? self.services.warning('No dask client in call to finished tasks status') @@ -3516,9 +3589,10 @@ def get_dask_finished_tasks_status(self): # is doubtful. return dict(result) - self.services.debug('get_dask_finished_tasks_status: no result, returning None') + self.services.debug('get_dask_finished_tasks_status: no result, ' + 'returning None') - return result # which will be none + return result # which will be none or {} def get_finished_tasks_status(self): """