Skip to content

Commit 2ea447b

Browse files
authored
Merge pull request #222 from byexamples/Issue-220-Make-Plugins-Support-Multiprocessing-Spawn
Issue 220 make plugins support multiprocessing spawn
2 parents 0375a6f + 3838adc commit 2ea447b

8 files changed

Lines changed: 367 additions & 32 deletions

File tree

byexample/cfg.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ class Config(collections.abc.Mapping):
1919
2020
The rest of the keys-values must be of immutable types.
2121
22+
A special key 'prepare_subprocess_call' is expected to be a partial
23+
function, which it is assumed to have immutable bound parameters.
24+
2225
To use Config in a multithread/multiprocess environment,
2326
you need to call copy() to get an independent copy to work with.
2427
@@ -39,7 +42,10 @@ def __len__(self):
3942

4043
def _ensure_cfg_is_constant(self):
4144
const_types = (int, frozenset, str, bool, bytes, type(None))
42-
exception_keys = ('options', 'output', 'registry', 'namespaces')
45+
exception_keys = (
46+
'options', 'output', 'registry', 'namespaces',
47+
'prepare_subprocess_call'
48+
)
4349
for k, v in self._d.items():
4450
if k in exception_keys:
4551
continue
@@ -58,6 +64,9 @@ def copy(self, patch={}):
5864
Assuming that all the values are constant (immutable),
5965
the copy does not incur in the costs of a real copy.
6066
67+
This includes 'prepare_subprocess_call' for which only
68+
a reference is copied and not a full copy.
69+
6170
Exceptions to this are:
6271
- options: which a real copy is made (see Options.copy)
6372
- output: a file which it is NOT copied

byexample/init.py

Lines changed: 131 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from __future__ import unicode_literals
2-
import sys, pkgutil, inspect, pprint, os, operator
2+
import sys, pkgutil, inspect, pprint, os, operator, traceback, functools
33
import importlib.util
44

55
from itertools import chain as chain_iters
@@ -91,6 +91,38 @@ def _is_empty(self):
9191
return not bool(self._attribute_names())
9292

9393

94+
def import_and_register_modules_iter(dirnames):
95+
''' Import and register the (python) modules located in the given
96+
directories.
97+
98+
The loaded modules will be registered and accessible
99+
from sys.modules as any imported python module.
100+
101+
This function will not try to instantiate any
102+
object from the loaded modules.
103+
104+
Moreover, this function will not assume that it is running
105+
in the main process of byexample so it will not use anything
106+
from byexample's runtime like clog() as this function may
107+
be called by a child process.
108+
'''
109+
for importer, name, is_pkg in pkgutil.iter_modules(dirnames):
110+
path = importer.path
111+
err = None
112+
113+
try:
114+
spec = importer.find_spec(name)
115+
module = importlib.util.module_from_spec(spec)
116+
spec.loader.exec_module(module)
117+
118+
sys.modules[name] = module
119+
120+
except Exception as e:
121+
err = e
122+
123+
yield (path, name, module, err)
124+
125+
94126
@log_context('byexample.load')
95127
@profile
96128
def load_modules(dirnames, cfg):
@@ -103,18 +135,13 @@ def load_modules(dirnames, cfg):
103135
'zdelimiters': {},
104136
}
105137
namespaces_by_class = {}
106-
for importer, name, is_pkg in pkgutil.iter_modules(dirnames):
107-
path = importer.path
108-
109-
clog().debug("From '%s' loading '%s'...", path, name)
110-
111-
try:
112-
spec = importer.find_spec(name)
113-
module = importlib.util.module_from_spec(spec)
114-
spec.loader.exec_module(module)
115-
except Exception as e:
116-
clog().info(
117-
"From '%s' loading '%s'...failed: %s", path, name, str(e)
138+
for path, name, module, err in import_and_register_modules_iter(dirnames):
139+
if err:
140+
clog().exception(
141+
"From '%s' loading module '%s' failed. Skipping.",
142+
path,
143+
name,
144+
exc_info=err
118145
)
119146
continue
120147

@@ -125,7 +152,11 @@ def load_modules(dirnames, cfg):
125152
):
126153
stability = 'experimental/%s?' % str(stability)
127154

128-
clog().chat("From '%s' loaded '%s' (%s)", path, name, stability)
155+
clog().chat(
156+
"From '%s' loaded module '%s' (%s). Searching for extensions...",
157+
path, name, stability
158+
)
159+
129160
for klass, key, is_multikey, what in [
130161
(ExampleRunner, 'language', False, 'runners'),
131162
(ExampleParser, 'language', False, 'parsers'),
@@ -596,6 +627,21 @@ def init_byexample(args, sharer):
596627
cfg['use_colors'] &= are_tty_colors_supported(cfg['output'])
597628
cfg['selected_languages'] = frozenset(args.languages)
598629

630+
# Make a partial application of _prepare_subprocess_call(), binding
631+
# all the necessary parameters to import and register the byexample
632+
# modules (again) in a subprocess.
633+
#
634+
# The bound parameters are constant so the function, despite having
635+
# state, it is actually stateless (its state is constant, immutable)
636+
# Moreover, _prepare_subprocess_call() is thread-safe so the resulting
637+
# partial-bound function is thread-safe too.
638+
#
639+
# See _prepare_subprocess_call().
640+
prepare_subprocess_call = functools.partial(
641+
_prepare_subprocess_call, dirnames=tuple(args.modules_dirs)
642+
)
643+
cfg['prepare_subprocess_call'] = prepare_subprocess_call
644+
599645
registry, namespaces_by_class = load_modules(args.modules_dirs, cfg)
600646

601647
allowed_languages = get_allowed_languages(registry, args.languages)
@@ -661,6 +707,70 @@ def init_byexample(args, sharer):
661707
return testfiles, Config(cfg)
662708

663709

710+
def _subprocess_trampoline(
711+
dirnames, serialized_func, serialized_args, serialized_kwargs
712+
):
713+
# All of this happens in the *child* process
714+
# We reload the modules if they weren't loaded yet
715+
# and only then we deserialize the target function and we
716+
# call it.
717+
#
718+
# If _subprocess_trampoline is called in a fresh subprocess,
719+
# we are sure that no module was loaded yet however, it is
720+
# possible that the user runs a subprocess using forking
721+
# which makes a copy of the python process (parent) and therefore
722+
# it will have the modules loaded already.
723+
#
724+
# By the moment it is unclear if in addition to the loading we want
725+
# to do more like the instantiation of the plugins.
726+
from .init import import_and_register_modules_iter
727+
_ = list(import_and_register_modules_iter(dirnames))
728+
729+
import multiprocessing.reduction
730+
fpickler = multiprocessing.reduction.ForkingPickler
731+
target = fpickler.loads(serialized_func)
732+
args = fpickler.loads(serialized_args)
733+
kwargs = fpickler.loads(serialized_kwargs)
734+
735+
return target(*args, **kwargs)
736+
737+
738+
def _prepare_subprocess_call(target, dirnames, *, args=(), kwargs={}):
739+
''' Prepare the given target function to be executable in a separated
740+
process (child process).
741+
742+
The preparation includes the (re)import and (re)registration of
743+
the modules found in <dirnames>, once loaded by byexample in the parent
744+
process.
745+
746+
This re-import and re-registration within the child process
747+
is needed because the child may be an independent fresh Python
748+
process without any idea of how to load byexample modules.
749+
750+
_prepare_subprocess_call returns a dictionary with keys 'target'
751+
and 'args' suitable to call multiprocessing.Process.
752+
753+
Note: no user code should call _prepare_subprocess_call directly.
754+
Instead, call a partial bound function from the Config cfg object
755+
given to each extension (ExampleFinder, ExampleParser, Concern, ...).
756+
This partial function will not require the <dirnames> argument.
757+
'''
758+
# Implementation note: this function must be thread-safe because it
759+
# may be called from different threads.
760+
import multiprocessing.reduction
761+
fpickler = multiprocessing.reduction.ForkingPickler
762+
763+
serialized_func = bytes(fpickler.dumps(target))
764+
serialized_args = bytes(fpickler.dumps(args))
765+
serialized_kwargs = bytes(fpickler.dumps(kwargs))
766+
767+
trampoline_args = (
768+
dirnames, serialized_func, serialized_args, serialized_kwargs
769+
)
770+
771+
return {'target': _subprocess_trampoline, 'args': trampoline_args}
772+
773+
664774
@profile
665775
def init_worker(cfg, job_num):
666776
''' Initialize a worker with worker/job number is passed
@@ -677,10 +787,15 @@ def init_worker(cfg, job_num):
677787
If the recreation process is thread safe (depends of the objects'
678788
implementations), then init_worker is thread safe.
679789
'''
680-
# let the rest of byexample for this worker to know
790+
patch = {}
791+
# Patch the job_number: let the rest of byexample for this worker to know
681792
# in which worker is on
682793
assert cfg['job_number'] == '__main__'
683-
cfg = cfg.copy(patch={'job_number': int(job_num)})
794+
patch['job_number'] = int(job_num)
795+
796+
# Get an independent copy of cfg (and therefore, thread-safe)
797+
# with some keys patched
798+
cfg = cfg.copy(patch=patch)
684799

685800
concerns = ConcernComposite(**cfg)
686801

byexample/log.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,11 @@ def exception(self, msg, *args, **kwargs):
197197
message, augmented by kwargs['where'] and exception.where
198198
attributes (contextual messages).
199199
200-
If <msg> is not None, run as usual (logging.Logger.exception)
200+
If exc_info is given and it is an exception object, use
201+
that instead of the current caught exception.
202+
203+
If <msg> is not None nor empty,
204+
run as usual (logging.Logger.exception)
201205
'''
202206
exc_info = kwargs.pop('exc_info', True)
203207
where_default = kwargs.pop('where', None)
@@ -207,15 +211,19 @@ def exception(self, msg, *args, **kwargs):
207211
if where_default:
208212
msg += ' {where_default}'
209213

210-
ex = sys.exc_info()[1]
214+
if isinstance(exc_info, BaseException):
215+
ex = exc_info
216+
else:
217+
ex = sys.exc_info()[1]
218+
211219
where = getattr(ex, 'where', None)
212220
if where:
213221
msg += ', {where}'
214222

215223
msg += ':'
216224
msg = msg.format(where_default=where_default, where=where)
217225

218-
return self.error(msg, exc_info=True, **kwargs)
226+
return self.error(msg, exc_info=exc_info, **kwargs)
219227
else:
220228
return Logger.exception(
221229
self, msg, *args, exc_info=exc_info, **kwargs

0 commit comments

Comments
 (0)