Skip to content

Commit b263ba7

Browse files
committed
Offer a prepare_subprocess_call to make a func multiprocessing safe
When multiprocessing uses another start method different of 'fork', it requires that the target function and its arguments can be pickled. In particular, multiprocessing needs to pickle them in the parent process and un-pickle them in the child process. Due how pickling works in Python, pickling a function only involves storing the information needed to reload it: the bytecode is never stored. This makes pickling particular tricky in byexample: if a we want to call a function that it is from one of the byexample modules/plugins, the pickling will fail. It will not fail when multiprocessing serialize it (dumps) but when it deserialize it (loads) because in the child process, the byexample modules/plugins will not be loaded in sys.modules and because they are not in the sys.path (in principle), Python will not be able to find them. _prepare_subprocess_call() can wrap the target function and its arguments and replacing the target by a _subprocess_trampoline function that will call the former. This _subprocess_trampoline will do all the bootstraping needed in the child process, including the (re)loading of the modules/plugins, to make the un-pickling work. Most of these details are hidden from the user (plugin developer). He/she is only required to call prepare_subprocess_call() and use the returned target/arguments in replace of his/her. The prepare_subprocess_call() is a partial bound function of _prepare_subprocess_call(). The former can be obtained optionally from the extension constructor (__init__ method of ExampleParser, ExampleFinder, ExampleRunner, ZoneDelimiter and Concern). The function is thread-safe and it will accessible both in the main byexample process and in each worker thread.
1 parent 83e1496 commit b263ba7

2 files changed

Lines changed: 90 additions & 2 deletions

File tree

byexample/cfg.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ class Config(collections.abc.Mapping):
1919
2020
The rest of the keys-values must be of immutable types.
2121
22+
A special key 'prepare_subprocess_call' is expected to be a partial
23+
function, which it is assumed to have immutable bound parameters.
24+
2225
To use Config in a multithread/multiprocess environment,
2326
you need to call copy() to get an independent copy to work with.
2427
@@ -39,7 +42,10 @@ def __len__(self):
3942

4043
def _ensure_cfg_is_constant(self):
4144
const_types = (int, frozenset, str, bool, bytes, type(None))
42-
exception_keys = ('options', 'output', 'registry', 'namespaces')
45+
exception_keys = (
46+
'options', 'output', 'registry', 'namespaces',
47+
'prepare_subprocess_call'
48+
)
4349
for k, v in self._d.items():
4450
if k in exception_keys:
4551
continue
@@ -58,6 +64,9 @@ def copy(self, patch={}):
5864
Assuming that all the values are constant (immutable),
5965
the copy does not incur in the costs of a real copy.
6066
67+
This includes 'prepare_subprocess_call' for which only
68+
a reference is copied and not a full copy.
69+
6170
Exceptions to this are:
6271
- options: which a real copy is made (see Options.copy)
6372
- output: a file which it is NOT copied

byexample/init.py

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from __future__ import unicode_literals
2-
import sys, pkgutil, inspect, pprint, os, operator, traceback
2+
import sys, pkgutil, inspect, pprint, os, operator, traceback, functools
33
import importlib.util
44

55
from itertools import chain as chain_iters
@@ -627,6 +627,21 @@ def init_byexample(args, sharer):
627627
cfg['use_colors'] &= are_tty_colors_supported(cfg['output'])
628628
cfg['selected_languages'] = frozenset(args.languages)
629629

630+
# Make a partial application of _prepare_subprocess_call(), binding
631+
# all the necessary parameters to import and register the byexample
632+
# modules (again) in a subprocess.
633+
#
634+
# The bound parameters are constant so the function, despite having
635+
# state, it is actually stateless (its state is constant, immutable)
636+
# Moreover, _prepare_subprocess_call() is thread-safe so the resulting
637+
# partial-bound function is thread-safe too.
638+
#
639+
# See _prepare_subprocess_call().
640+
prepare_subprocess_call = functools.partial(
641+
_prepare_subprocess_call, dirnames=tuple(args.modules_dirs)
642+
)
643+
cfg['prepare_subprocess_call'] = prepare_subprocess_call
644+
630645
registry, namespaces_by_class = load_modules(args.modules_dirs, cfg)
631646

632647
allowed_languages = get_allowed_languages(registry, args.languages)
@@ -692,6 +707,70 @@ def init_byexample(args, sharer):
692707
return testfiles, Config(cfg)
693708

694709

710+
def _subprocess_trampoline(
711+
dirnames, serialized_func, serialized_args, serialized_kwargs
712+
):
713+
# All of this happens in the *child* process
714+
# We reload the modules if they weren't loaded yet
715+
# and only then we deserialize the target function and we
716+
# call it.
717+
#
718+
# If _subprocess_trampoline is called in a fresh subprocess,
719+
# we are sure that no module was loaded yet however, it is
720+
# possible that the user runs a subprocess using forking
721+
# which makes a copy of the python process (parent) and therefore
722+
# it will have the modules loaded already.
723+
#
724+
# By the moment it is unclear if in addition to the loading we want
725+
# to do more like the instantiation of the plugins.
726+
from .init import import_and_register_modules_iter
727+
_ = list(import_and_register_modules_iter(dirnames))
728+
729+
import multiprocessing.reduction
730+
fpickler = multiprocessing.reduction.ForkingPickler
731+
target = fpickler.loads(serialized_func)
732+
args = fpickler.loads(serialized_args)
733+
kwargs = fpickler.loads(serialized_kwargs)
734+
735+
return target(*args, **kwargs)
736+
737+
738+
def _prepare_subprocess_call(target, dirnames, *, args=(), kwargs={}):
739+
''' Prepare the given target function to be executable in a separated
740+
process (child process).
741+
742+
The preparation includes the (re)import and (re)registration of
743+
the modules found in <dirnames>, once loaded by byexample in the parent
744+
process.
745+
746+
This re-import and re-registration within the child process
747+
is needed because the child may be an independent fresh Python
748+
process without any idea of how to load byexample modules.
749+
750+
_prepare_subprocess_call returns a dictionary with keys 'target'
751+
and 'args' suitable to call multiprocessing.Process.
752+
753+
Note: no user code should call _prepare_subprocess_call directly.
754+
Instead, call a partial bound function from the Config cfg object
755+
given to each extension (ExampleFinder, ExampleParser, Concern, ...).
756+
This partial function will not require the <dirnames> argument.
757+
'''
758+
# Implementation note: this function must be thread-safe because it
759+
# may be called from different threads.
760+
import multiprocessing.reduction
761+
fpickler = multiprocessing.reduction.ForkingPickler
762+
763+
serialized_func = bytes(fpickler.dumps(target))
764+
serialized_args = bytes(fpickler.dumps(args))
765+
serialized_kwargs = bytes(fpickler.dumps(kwargs))
766+
767+
trampoline_args = (
768+
dirnames, serialized_func, serialized_args, serialized_kwargs
769+
)
770+
771+
return {'target': _subprocess_trampoline, 'args': trampoline_args}
772+
773+
695774
@profile
696775
def init_worker(cfg, job_num):
697776
''' Initialize a worker with worker/job number is passed

0 commit comments

Comments
 (0)