6363 IDLE_THRESHOLD , MAX_IDLE_HEARTBEATS ,
6464 MAX_TIME_BETWEEN_HEARTBEATS , NUM_RESUBMITS ,
6565 SEND_ERROR_MAIL , SMTP_SERVER , USE_MEM_FREE ,
66- DEFAULT_TEMP_DIR )
66+ DEFAULT_TEMP_DIR , DEFAULT_PAR_ENV )
6767from gridmap .data import zdumps , zloads
6868from gridmap .runner import _heart_beat
6969
@@ -116,11 +116,11 @@ class Job(object):
116116 'cause_of_death' , 'num_resubmits' , 'home_address' ,
117117 'log_stderr_fn' , 'log_stdout_fn' , 'timestamp' , 'host_name' ,
118118 'heart_beat' , 'track_mem' , 'track_cpu' , 'interpreting_shell' ,
119- 'copy_env' )
119+ 'copy_env' , 'par_env' )
120120
121121 def __init__ (self , f , args , kwlist = None , cleanup = True , mem_free = "1G" ,
122122 name = 'gridmap_job' , num_slots = 1 , queue = DEFAULT_QUEUE ,
123- interpreting_shell = None , copy_env = True , add_env = None ):
123+ interpreting_shell = None , copy_env = True , add_env = None , par_env = DEFAULT_PAR_ENV ):
124124 """
125125 Initializes a new Job.
126126
@@ -149,6 +149,8 @@ def __init__(self, f, args, kwlist=None, cleanup=True, mem_free="1G",
149149 Overwrites variables which already exist due to
150150 ``copy_env=True``.
151151 :type add_env: dict
152+ :param par_env: parallel environment to use.
153+ :type par_env: str
152154 """
153155 self .track_mem = []
154156 self .track_cpu = []
@@ -195,6 +197,7 @@ def _add_env(env_vars):
195197 if add_env is not None :
196198 _add_env (add_env )
197199 self .working_dir = os .getcwd ()
200+ self .par_env = par_env
198201
199202 @property
200203 def function (self ):
@@ -263,7 +266,7 @@ def native_specification(self):
263266 if self .mem_free and USE_MEM_FREE :
264267 ret += " -l mem_free={}" .format (self .mem_free )
265268 if self .num_slots and self .num_slots > 1 :
266- ret += " -pe smp {}" .format (self .num_slots )
269+ ret += " -pe {} {}" .format (self . par_env , self .num_slots )
267270 if self .white_list :
268271 ret += " -l h={}" .format ('|' .join (self .white_list ))
269272 if self .queue :
@@ -929,7 +932,7 @@ def grid_map(f, args_list, cleanup=True, mem_free="1G", name='gridmap_job',
929932 num_slots = 1 , temp_dir = DEFAULT_TEMP_DIR , white_list = None ,
930933 queue = DEFAULT_QUEUE , quiet = True , local = False , max_processes = 1 ,
931934 interpreting_shell = None , copy_env = True , add_env = None ,
932- completion_mail = False , require_cluster = False ):
935+ completion_mail = False , require_cluster = False , par_env = DEFAULT_PAR_ENV ):
933936 """
934937 Maps a function onto the cluster.
935938
@@ -978,6 +981,8 @@ def grid_map(f, args_list, cleanup=True, mem_free="1G", name='gridmap_job',
978981 Overwrites variables which already exist due to
979982 ``copy_env=True``.
980983 :type add_env: dict
984+ :param par_env: parallel environment to use.
985+ :type par_env: str
981986 :param completion_mail: whether to send an e-mail upon completion of all
982987 jobs
983988 :type completion_mail: boolean
@@ -993,7 +998,7 @@ def grid_map(f, args_list, cleanup=True, mem_free="1G", name='gridmap_job',
993998 cleanup = cleanup , mem_free = mem_free ,
994999 name = '{}{}' .format (name , job_num ), num_slots = num_slots ,
9951000 queue = queue , interpreting_shell = interpreting_shell ,
996- copy_env = copy_env , add_env = add_env )
1001+ copy_env = copy_env , add_env = add_env , par_env = par_env )
9971002 for job_num , args in enumerate (args_list )]
9981003
9991004 # process jobs
0 commit comments