Skip to content

Commit 8449bb5

Browse files
committed
Parse files in parallel when possible
1 parent a7bdffd commit 8449bb5

3 files changed

Lines changed: 133 additions & 48 deletions

File tree

mypy/build.py

Lines changed: 126 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
import sys
2727
import time
2828
import types
29-
from collections.abc import Callable, Iterator, Mapping, Sequence, Set as AbstractSet
29+
from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence, Set as AbstractSet
30+
from concurrent.futures import FIRST_EXCEPTION, ThreadPoolExecutor, wait
3031
from heapq import heappop, heappush
3132
from textwrap import dedent
3233
from typing import (
@@ -947,6 +948,63 @@ def dump_stats(self) -> None:
947948
for key, value in sorted(self.stats_summary().items()):
948949
print(f"{key + ':':24}{value}")
949950

951+
def parse_all(self, states: Iterable[State]) -> None:
952+
"""Parse multiple files in parallel (if possible) and compute dependencies.
953+
954+
Note: this duplicates a bit of logic from State.parse_file(). This is done
955+
as a micro-optimization to parallelize only those parts of the code that
956+
can be parallelized efficiently.
957+
"""
958+
if self.options.native_parser:
959+
futures = []
960+
parsed_states = set()
961+
# TODO: we should probably use psutil instead.
962+
# With psutil we can get a number of physical cores, while all stdlib
963+
# functions include virtual cores (which is not optimal for performance).
964+
available_threads = len(os.sched_getaffinity(0))
965+
# For some reason there is no visible improvement with more than 8 threads.
966+
# TODO: consider writing our own ThreadPool as an optimization.
967+
with ThreadPoolExecutor(max_workers=min(available_threads, 8)) as executor:
968+
for state in states:
969+
state.needs_parse = False
970+
if state.tree is not None:
971+
# The file was already parsed.
972+
continue
973+
# New parser reads source from file directly, we do this only for
974+
# the side effect of parsing inline mypy configurations.
975+
state.get_source()
976+
if state.id not in self.ast_cache:
977+
futures.append(executor.submit(state.parse_file_inner, state.source or ""))
978+
parsed_states.add(state)
979+
else:
980+
self.log(f"Using cached AST for {state.xpath} ({state.id})")
981+
state.tree, state.early_errors = self.ast_cache[state.id]
982+
for fut in wait(futures, return_when=FIRST_EXCEPTION).done:
983+
# This will raise exceptions, if any.
984+
fut.result()
985+
986+
for state in states:
987+
assert state.tree is not None
988+
if state in parsed_states:
989+
state.early_errors = list(self.errors.error_info_map.get(state.xpath, []))
990+
state.semantic_analysis_pass1()
991+
self.ast_cache[state.id] = (state.tree, state.early_errors)
992+
self.modules[state.id] = state.tree
993+
state.check_blockers()
994+
state.setup_errors()
995+
else:
996+
# Old parser cannot be parallelized.
997+
for state in states:
998+
state.parse_file()
999+
1000+
for state in states:
1001+
state.compute_dependencies()
1002+
if self.workers and state.tree:
1003+
# We don't need imports in coordinator process anymore, we parse only to
1004+
# compute dependencies.
1005+
state.tree.imports = []
1006+
del self.ast_cache[state.id]
1007+
9501008
def use_fine_grained_cache(self) -> bool:
9511009
return self.cache_enabled and self.options.use_fine_grained_cache
9521010

@@ -2505,8 +2563,7 @@ def new_state(
25052563
# we need to re-calculate dependencies.
25062564
# NOTE: see comment below for why we skip this in fine-grained mode.
25072565
if exist_added_packages(suppressed, manager):
2508-
state.parse_file() # This is safe because the cache is anyway stale.
2509-
state.compute_dependencies()
2566+
state.needs_parse = True # This is safe because the cache is anyway stale.
25102567
# This is an inverse to the situation above. If we had an import like this:
25112568
# from pkg import mod
25122569
# and then mod was deleted, we need to force recompute dependencies, to
@@ -2515,8 +2572,7 @@ def new_state(
25152572
# import pkg
25162573
# import pkg.mod
25172574
if exist_removed_submodules(dependencies, manager):
2518-
state.parse_file() # Same as above, the current state is stale anyway.
2519-
state.compute_dependencies()
2575+
state.needs_parse = True # Same as above, the current state is stale anyway.
25202576
state.size_hint = meta.size
25212577
else:
25222578
# When doing a fine-grained cache load, pretend we only
@@ -2526,14 +2582,17 @@ def new_state(
25262582
manager.log(f"Deferring module to fine-grained update {path} ({id})")
25272583
raise ModuleNotFound
25282584

2529-
# Parse the file (and then some) to get the dependencies.
2530-
state.parse_file(temporary=temporary)
2531-
state.compute_dependencies()
2532-
if manager.workers and state.tree:
2533-
# We don't need imports in coordinator process anymore, we parse only to
2534-
# compute dependencies.
2535-
state.tree.imports = []
2536-
del manager.ast_cache[id]
2585+
if temporary:
2586+
# Eagerly parse temporary states, they are needed rarely.
2587+
state.parse_file(temporary=True)
2588+
state.compute_dependencies()
2589+
if state.manager.workers and state.tree:
2590+
# We don't need imports in coordinator process anymore, we parse only to
2591+
# compute dependencies.
2592+
state.tree.imports = []
2593+
del state.manager.ast_cache[state.id]
2594+
else:
2595+
state.needs_parse = True
25372596

25382597
return state
25392598

@@ -2596,6 +2655,8 @@ def __init__(
25962655
# Pre-computed opaque value of suppressed_deps_opts() used
25972656
# to minimize amount of data sent to parallel workers.
25982657
self.known_suppressed_deps_opts: bytes | None = None
2658+
# An internal flag used by build manager to schedule states for parsing.
2659+
self.needs_parse = False
25992660

26002661
def write(self, buf: WriteBuffer) -> None:
26012662
"""Serialize State for sending to build worker.
@@ -2819,26 +2880,9 @@ def fix_cross_refs(self) -> None:
28192880

28202881
# Methods for processing modules from source code.
28212882

2822-
def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None = None) -> None:
2823-
"""Parse file and run first pass of semantic analysis.
2824-
2825-
Everything done here is local to the file. Don't depend on imported
2826-
modules in any way. Also record module dependencies based on imports.
2827-
"""
2828-
if self.tree is not None:
2829-
# The file was already parsed (in __init__()).
2830-
return
2831-
2883+
def get_source(self) -> str:
2884+
"""Get module source and parse inline mypy configurations."""
28322885
manager = self.manager
2833-
2834-
# Can we reuse a previously parsed AST? This avoids redundant work in daemon.
2835-
cached = self.id in manager.ast_cache
2836-
modules = manager.modules
2837-
if not cached:
2838-
manager.log(f"Parsing {self.xpath} ({self.id})")
2839-
else:
2840-
manager.log(f"Using cached AST for {self.xpath} ({self.id})")
2841-
28422886
t0 = time_ref()
28432887

28442888
with self.wrap_context():
@@ -2880,33 +2924,53 @@ def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None =
28802924
self.check_for_invalid_options()
28812925

28822926
self.size_hint = len(source)
2883-
if not cached:
2884-
ignore_errors = self.ignore_all or self.options.ignore_errors
2885-
self.tree = manager.parse_file(
2886-
self.id,
2887-
self.xpath,
2888-
source,
2889-
ignore_errors=ignore_errors,
2890-
options=self.options,
2891-
raw_data=raw_data,
2892-
)
2893-
else:
2894-
# Reuse a cached AST
2895-
self.tree = manager.ast_cache[self.id][0]
2927+
self.time_spent_us += time_spent_us(t0)
2928+
return source
28962929

2930+
def parse_file_inner(self, source: str, raw_data: FileRawData | None = None) -> None:
2931+
t0 = time_ref()
2932+
self.manager.log(f"Parsing {self.xpath} ({self.id})")
2933+
with self.wrap_context():
2934+
ignore_errors = self.ignore_all or self.options.ignore_errors
2935+
self.tree = self.manager.parse_file(
2936+
self.id,
2937+
self.xpath,
2938+
source,
2939+
ignore_errors=ignore_errors,
2940+
options=self.options,
2941+
raw_data=raw_data,
2942+
)
28972943
self.time_spent_us += time_spent_us(t0)
28982944

2899-
if not cached:
2945+
def parse_file(self, *, temporary: bool = False, raw_data: FileRawData | None = None) -> None:
2946+
"""Parse file and run first pass of semantic analysis.
2947+
2948+
Everything done here is local to the file. Don't depend on imported
2949+
modules in any way. Logic here should be kept in sync with BuildManager.parse_all().
2950+
"""
2951+
self.needs_parse = False
2952+
if self.tree is not None:
2953+
# The file was already parsed.
2954+
return
2955+
2956+
source = self.get_source()
2957+
manager = self.manager
2958+
# Can we reuse a previously parsed AST? This avoids redundant work in daemon.
2959+
if self.id not in manager.ast_cache:
2960+
self.parse_file_inner(source, raw_data)
29002961
# Make a copy of any errors produced during parse time so that
29012962
# fine-grained mode can repeat them when the module is
29022963
# reprocessed.
29032964
self.early_errors = list(manager.errors.error_info_map.get(self.xpath, []))
29042965
self.semantic_analysis_pass1()
29052966
else:
2906-
self.early_errors = manager.ast_cache[self.id][1]
2967+
# Reuse a cached AST
2968+
manager.log(f"Using cached AST for {self.xpath} ({self.id})")
2969+
self.tree, self.early_errors = manager.ast_cache[self.id]
29072970

2971+
assert self.tree is not None
29082972
if not temporary:
2909-
modules[self.id] = self.tree
2973+
manager.modules[self.id] = self.tree
29102974
self.check_blockers()
29112975

29122976
manager.ast_cache[self.id] = (self.tree, self.early_errors)
@@ -3912,14 +3976,23 @@ def load_graph(
39123976
graph[st.id] = st
39133977
new.append(st)
39143978
entry_points.add(bs.module)
3979+
manager.parse_all([state for state in new if state.needs_parse])
39153980

39163981
# Note: Running this each time could be slow in the daemon. If it's a problem, we
39173982
# can do more work to maintain this incrementally.
39183983
seen_files = {st.abspath: st for st in graph.values() if st.path}
39193984

39203985
# Collect dependencies. We go breadth-first.
39213986
# More nodes might get added to new as we go, but that's fine.
3987+
ready = set(new)
3988+
not_ready: set[State] = set()
39223989
for st in new:
3990+
if st not in ready:
3991+
# We have run out of states, parse all we have.
3992+
assert st in not_ready
3993+
manager.parse_all(not_ready)
3994+
ready |= not_ready
3995+
not_ready.clear()
39233996
assert st.ancestors is not None
39243997
# Strip out indirect dependencies. These will be dealt with
39253998
# when they show up as direct dependencies, and there's a
@@ -3975,6 +4048,7 @@ def load_graph(
39754048
newst_path = newst.abspath
39764049

39774050
if newst_path in seen_files:
4051+
manager.errors.set_file(newst.xpath, newst.id, manager.options)
39784052
manager.error(
39794053
None,
39804054
"Source file found twice under different module names: "
@@ -3995,6 +4069,10 @@ def load_graph(
39954069
assert newst.id not in graph, newst.id
39964070
graph[newst.id] = newst
39974071
new.append(newst)
4072+
if newst.needs_parse:
4073+
not_ready.add(newst)
4074+
else:
4075+
ready.add(newst)
39984076
# There are two things we need to do after the initial load loop. One is up-suppress
39994077
# modules that are back in graph. We need to do this after the loop to cover edge cases
40004078
# like where a namespace package ancestor is shared by a typed and an untyped package.

mypy/nativeparse.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from __future__ import annotations
2121

2222
import os
23+
import time
2324
from typing import Any, Final, cast
2425

2526
import ast_serialize # type: ignore[import-untyped, import-not-found, unused-ignore]
@@ -273,6 +274,10 @@ def read_statements(state: State, data: ReadBuffer, n: int) -> list[Statement]:
273274
def parse_to_binary_ast(
274275
filename: str, options: Options, skip_function_bodies: bool = False
275276
) -> tuple[bytes, list[dict[str, Any]], TypeIgnores, bytes, bool, bool]:
277+
# This is a horrible hack to work around a mypyc bug where imported
278+
# module may be not ready in a thread sometimes.
279+
while ast_serialize is None:
280+
time.sleep(0.0001) # type: ignore[unreachable]
276281
ast_bytes, errors, ignores, import_bytes, ast_data = ast_serialize.parse(
277282
filename,
278283
skip_function_bodies=skip_function_bodies,

mypy/test/testgraph.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ def test_sorted_components(self) -> None:
117117
"c": State.new_state("c", None, "import b, d", manager),
118118
"builtins": State.new_state("builtins", None, "", manager),
119119
}
120+
manager.parse_all(graph.values())
120121
res = [scc.mod_ids for scc in sorted_components(graph)]
121122
assert_equal(res, [{"builtins"}, {"d"}, {"c", "b"}, {"a"}])
122123

@@ -129,6 +130,7 @@ def test_order_ascc(self) -> None:
129130
"c": State.new_state("c", None, "import b, d", manager),
130131
"builtins": State.new_state("builtins", None, "", manager),
131132
}
133+
manager.parse_all(graph.values())
132134
res = [scc.mod_ids for scc in sorted_components(graph)]
133135
assert_equal(res, [{"builtins"}, {"a", "d", "c", "b"}])
134136
ascc = res[1]

0 commit comments

Comments
 (0)