Skip to content

Commit 70de10e

Browse files
authored
Send early errors to workers (#20822)
Ref #933 When parallel parsing is ready, we will not run `load_graph()` in each worker. So, we need to send early errors (mostly import errors) to the relevant workers. Couple notes here: * Now `only_once` messages are shown only once per worker. It is hard to fix, and I don't think we should really bother. It doesn't affect correctness, and may only slightly increase noise. Btw they were never truly `only_once`, when reading errors from cache we can get multiple occurrences of the same error/note. * While I am at it, I switch few more error to the standard `file.py: error: some message` format.
1 parent df6922d commit 70de10e

12 files changed

Lines changed: 218 additions & 88 deletions

mypy/build.py

Lines changed: 92 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ def default_flush_errors(
399399
finally:
400400
for worker in workers:
401401
try:
402-
send(worker.conn, SccRequestMessage(scc_id=None))
402+
send(worker.conn, SccRequestMessage(scc_id=None, import_errors={}))
403403
except (OSError, IPCException):
404404
pass
405405
for worker in workers:
@@ -437,6 +437,9 @@ def build_inner(
437437
source_set = BuildSourceSet(sources)
438438
cached_read = fscache.read
439439
errors = Errors(options, read_source=lambda path: read_py_file(path, cached_read))
440+
# Record import errors so that they can be replayed by the workers.
441+
if workers:
442+
errors.global_watcher = True
440443
plugin, snapshot = load_plugins(options, errors, stdout, extra_plugins)
441444

442445
# Validate error codes after plugins are loaded.
@@ -904,6 +907,10 @@ def __init__(
904907
self.import_options: dict[str, bytes] = {}
905908
# Cache for transitive dependency check (expensive).
906909
self.transitive_deps_cache: dict[tuple[int, int], bool] = {}
910+
# Resolved paths for each module in build.
911+
self.path_by_id: dict[str, str] = {}
912+
# Packages for which we know presence or absence of __getattr__().
913+
self.known_partial_packages: dict[str, bool] = {}
907914

908915
def dump_stats(self) -> None:
909916
if self.options.dump_build_stats:
@@ -1045,8 +1052,6 @@ def parse_file(
10451052
if self.errors.is_blockers():
10461053
self.log("Bailing due to parse errors")
10471054
self.errors.raise_error()
1048-
1049-
self.errors.set_file_ignored_lines(path, tree.ignored_lines, ignore_errors)
10501055
return tree
10511056

10521057
def load_fine_grained_deps(self, id: str) -> dict[str, set[str]]:
@@ -1118,7 +1123,15 @@ def submit_to_workers(self, sccs: list[SCC] | None = None) -> None:
11181123
while self.scc_queue and self.free_workers:
11191124
idx = self.free_workers.pop()
11201125
_, _, scc = heappop(self.scc_queue)
1121-
send(self.workers[idx].conn, SccRequestMessage(scc_id=scc.id))
1126+
import_errors = {
1127+
mod_id: self.errors.recorded[path]
1128+
for mod_id in scc.mod_ids
1129+
if (path := self.path_by_id[mod_id]) in self.errors.recorded
1130+
}
1131+
send(
1132+
self.workers[idx].conn,
1133+
SccRequestMessage(scc_id=scc.id, import_errors=import_errors),
1134+
)
11221135

11231136
def wait_for_done(
11241137
self, graph: Graph
@@ -2399,8 +2412,10 @@ def new_state(
23992412
state.compute_dependencies()
24002413
if manager.workers:
24012414
# We don't need parsed trees in coordinator process, we parse only to
2402-
# compute dependencies.
2403-
state.tree = None
2415+
# compute dependencies. Keep temporary tree until the caller uses it
2416+
if not temporary:
2417+
state.tree = None
2418+
del manager.modules[id]
24042419
del manager.ast_cache[id]
24052420

24062421
return state
@@ -2533,7 +2548,8 @@ def read(cls, buf: ReadBuffer, manager: BuildManager) -> State:
25332548
id=id,
25342549
path=path,
25352550
source=source,
2536-
options=manager.options.clone_for_module(id),
2551+
# The caller must call clone_for_module().
2552+
options=manager.options,
25372553
ignore_all=ignore_all,
25382554
caller_line=caller_line,
25392555
import_context=import_context,
@@ -2721,7 +2737,7 @@ def parse_file(self, *, temporary: bool = False) -> None:
27212737
assert ioerr.errno is not None
27222738
raise CompileError(
27232739
[
2724-
"mypy: can't read file '{}': {}".format(
2740+
"mypy: error: cannot read file '{}': {}".format(
27252741
self.path.replace(os.getcwd() + os.sep, ""),
27262742
os.strerror(ioerr.errno),
27272743
)
@@ -2730,9 +2746,9 @@ def parse_file(self, *, temporary: bool = False) -> None:
27302746
) from ioerr
27312747
except (UnicodeDecodeError, DecodeError) as decodeerr:
27322748
if self.path.endswith(".pyd"):
2733-
err = f"mypy: stubgen does not support .pyd files: '{self.path}'"
2749+
err = f"{self.path}: error: stubgen does not support .pyd files"
27342750
else:
2735-
err = f"mypy: can't decode file '{self.path}': {str(decodeerr)}"
2751+
err = f"{self.path}: error: cannot decode file: {str(decodeerr)}"
27362752
raise CompileError([err], module_with_blocker=self.id) from decodeerr
27372753
elif self.path and self.manager.fscache.isdir(self.path):
27382754
source = ""
@@ -2746,22 +2762,13 @@ def parse_file(self, *, temporary: bool = False) -> None:
27462762

27472763
self.size_hint = len(source)
27482764
if not cached:
2765+
ignore_errors = self.ignore_all or self.options.ignore_errors
27492766
self.tree = manager.parse_file(
2750-
self.id,
2751-
self.xpath,
2752-
source,
2753-
ignore_errors=self.ignore_all or self.options.ignore_errors,
2754-
options=self.options,
2767+
self.id, self.xpath, source, ignore_errors=ignore_errors, options=self.options
27552768
)
2756-
27572769
else:
27582770
# Reuse a cached AST
27592771
self.tree = manager.ast_cache[self.id][0]
2760-
manager.errors.set_file_ignored_lines(
2761-
self.xpath,
2762-
self.tree.ignored_lines,
2763-
self.ignore_all or self.options.ignore_errors,
2764-
)
27652772

27662773
self.time_spent_us += time_spent_us(t0)
27672774

@@ -2770,19 +2777,23 @@ def parse_file(self, *, temporary: bool = False) -> None:
27702777
# fine-grained mode can repeat them when the module is
27712778
# reprocessed.
27722779
self.early_errors = list(manager.errors.error_info_map.get(self.xpath, []))
2780+
self.semantic_analysis_pass1()
27732781
else:
27742782
self.early_errors = manager.ast_cache[self.id][1]
27752783

27762784
if not temporary:
27772785
modules[self.id] = self.tree
2778-
2779-
if not cached:
2780-
self.semantic_analysis_pass1()
2781-
2782-
if not temporary:
27832786
self.check_blockers()
27842787

27852788
manager.ast_cache[self.id] = (self.tree, self.early_errors)
2789+
self.setup_errors()
2790+
2791+
def setup_errors(self) -> None:
2792+
assert self.tree is not None
2793+
self.manager.errors.set_file_ignored_lines(
2794+
self.xpath, self.tree.ignored_lines, self.ignore_all or self.options.ignore_errors
2795+
)
2796+
self.manager.errors.set_skipped_lines(self.xpath, self.tree.skipped_lines)
27862797

27872798
def parse_inline_configuration(self, source: str) -> None:
27882799
"""Check for inline mypy: options directive and parse them."""
@@ -2821,7 +2832,6 @@ def semantic_analysis_pass1(self) -> None:
28212832
analyzer = SemanticAnalyzerPreAnalysis()
28222833
with self.wrap_context():
28232834
analyzer.visit_file(self.tree, self.xpath, self.id, options)
2824-
self.manager.errors.set_skipped_lines(self.xpath, self.tree.skipped_lines)
28252835
# TODO: Do this while constructing the AST?
28262836
self.tree.names = SymbolTable()
28272837
if not self.tree.is_stub:
@@ -3362,23 +3372,28 @@ def in_partial_package(id: str, manager: BuildManager) -> bool:
33623372
defines a module-level __getattr__ (a.k.a. partial stub package).
33633373
"""
33643374
while "." in id:
3365-
parent, _ = id.rsplit(".", 1)
3366-
if parent in manager.modules:
3367-
parent_mod: MypyFile | None = manager.modules[parent]
3375+
ancestor, _ = id.rsplit(".", 1)
3376+
if ancestor in manager.known_partial_packages:
3377+
return manager.known_partial_packages[ancestor]
3378+
if ancestor in manager.modules:
3379+
ancestor_mod: MypyFile | None = manager.modules[ancestor]
33683380
else:
3369-
# Parent is not in build, try quickly if we can find it.
3381+
# Ancestor is not in build, try quickly if we can find it.
33703382
try:
3371-
parent_st = State.new_state(
3372-
id=parent, path=None, source=None, manager=manager, temporary=True
3383+
ancestor_st = State.new_state(
3384+
id=ancestor, path=None, source=None, manager=manager, temporary=True
33733385
)
33743386
except (ModuleNotFound, CompileError):
3375-
parent_mod = None
3387+
ancestor_mod = None
33763388
else:
3377-
parent_mod = parent_st.tree
3378-
if parent_mod is not None:
3389+
ancestor_mod = ancestor_st.tree
3390+
# We will not need this anymore.
3391+
ancestor_st.tree = None
3392+
if ancestor_mod is not None:
33793393
# Bail out soon, complete subpackage found
3380-
return parent_mod.is_partial_stub_package
3381-
id = parent
3394+
manager.known_partial_packages[ancestor] = ancestor_mod.is_partial_stub_package
3395+
return ancestor_mod.is_partial_stub_package
3396+
id = ancestor
33823397
return False
33833398

33843399

@@ -3537,7 +3552,7 @@ def dispatch(sources: list[BuildSource], manager: BuildManager, stdout: TextIO)
35373552
initial_gc_freeze_done = True
35383553

35393554
for id in graph:
3540-
manager.import_map[id] = set(graph[id].dependencies + graph[id].suppressed)
3555+
manager.import_map[id] = graph[id].dependencies_set
35413556

35423557
t1 = time.time()
35433558
manager.add_stats(
@@ -3839,6 +3854,8 @@ def load_graph(
38393854
if dep not in graph:
38403855
st.suppress_dependency(dep)
38413856
manager.plugin.set_modules(manager.modules)
3857+
manager.path_by_id = {id: graph[id].xpath for id in graph}
3858+
manager.errors.global_watcher = False
38423859
return graph
38433860

38443861

@@ -3966,7 +3983,9 @@ def find_stale_sccs(
39663983
def process_graph(graph: Graph, manager: BuildManager) -> None:
39673984
"""Process everything in dependency order."""
39683985
# Broadcast graph to workers before computing SCCs to save a bit of time.
3969-
graph_message = GraphMessage(graph=graph)
3986+
# TODO: check if we can optimize by sending only part of the graph needed for given SCC.
3987+
# For example only send modules in the SCC and their dependencies.
3988+
graph_message = GraphMessage(graph=graph, missing_modules=set(manager.missing_modules))
39703989
buf = WriteBuffer()
39713990
graph_message.write(buf)
39723991
graph_data = buf.getvalue()
@@ -4108,7 +4127,7 @@ def process_fresh_modules(graph: Graph, modules: list[str], manager: BuildManage
41084127

41094128

41104129
def process_stale_scc(
4111-
graph: Graph, ascc: SCC, manager: BuildManager
4130+
graph: Graph, ascc: SCC, manager: BuildManager, from_cache: set[str] | None = None
41124131
) -> dict[str, tuple[str, list[str]]]:
41134132
"""Process the modules in one SCC from source code."""
41144133
# First verify if all transitive dependencies are loaded in the current process.
@@ -4173,7 +4192,9 @@ def process_stale_scc(
41734192
stale = scc
41744193
for id in stale:
41754194
# Re-generate import errors in case this module was loaded from the cache.
4176-
if graph[id].meta:
4195+
# Deserialized states all have meta=None, so the caller should specify
4196+
# explicitly which of them are from cache.
4197+
if graph[id].meta or from_cache and id in from_cache:
41774198
graph[id].verify_dependencies(suppressed_only=True)
41784199
# We may already have parsed the module, or not.
41794200
# If the former, parse_file() is a no-op.
@@ -4436,17 +4457,30 @@ class SccRequestMessage(IPCMessage):
44364457
If scc_id is None, then it means that the coordinator requested a shutdown.
44374458
"""
44384459

4439-
def __init__(self, *, scc_id: int | None) -> None:
4460+
def __init__(self, *, scc_id: int | None, import_errors: dict[str, list[ErrorInfo]]) -> None:
44404461
self.scc_id = scc_id
4462+
self.import_errors = import_errors
44414463

44424464
@classmethod
44434465
def read(cls, buf: ReadBuffer) -> SccRequestMessage:
44444466
assert read_tag(buf) == SCC_REQUEST_MESSAGE
4445-
return SccRequestMessage(scc_id=read_int_opt(buf))
4467+
return SccRequestMessage(
4468+
scc_id=read_int_opt(buf),
4469+
import_errors={
4470+
read_str(buf): [ErrorInfo.read(buf) for _ in range(read_int_bare(buf))]
4471+
for _ in range(read_int_bare(buf))
4472+
},
4473+
)
44464474

44474475
def write(self, buf: WriteBuffer) -> None:
44484476
write_tag(buf, SCC_REQUEST_MESSAGE)
44494477
write_int_opt(buf, self.scc_id)
4478+
write_int_bare(buf, len(self.import_errors))
4479+
for path, errors in self.import_errors.items():
4480+
write_str(buf, path)
4481+
write_int_bare(buf, len(errors))
4482+
for error in errors:
4483+
error.write(buf)
44504484

44514485

44524486
class SccResponseMessage(IPCMessage):
@@ -4570,19 +4604,31 @@ def write(self, buf: WriteBuffer) -> None:
45704604
class GraphMessage(IPCMessage):
45714605
"""A message wrapping the build graph computed by the coordinator."""
45724606

4573-
def __init__(self, *, graph: Graph) -> None:
4607+
def __init__(self, *, graph: Graph, missing_modules: set[str]) -> None:
45744608
self.graph = graph
4609+
self.missing_modules = missing_modules
4610+
# Send this data separately as it will be lost during state serialization.
4611+
self.from_cache = {mod_id for mod_id in graph if graph[mod_id].meta}
45754612

45764613
@classmethod
45774614
def read(cls, buf: ReadBuffer, manager: BuildManager | None = None) -> GraphMessage:
45784615
assert manager is not None
45794616
assert read_tag(buf) == GRAPH_MESSAGE
45804617
graph = {read_str_bare(buf): State.read(buf, manager) for _ in range(read_int_bare(buf))}
4581-
return GraphMessage(graph=graph)
4618+
missing_modules = {read_str_bare(buf) for _ in range(read_int_bare(buf))}
4619+
message = GraphMessage(graph=graph, missing_modules=missing_modules)
4620+
message.from_cache = {read_str_bare(buf) for _ in range(read_int_bare(buf))}
4621+
return message
45824622

45834623
def write(self, buf: WriteBuffer) -> None:
45844624
write_tag(buf, GRAPH_MESSAGE)
45854625
write_int_bare(buf, len(self.graph))
45864626
for mod_id, state in self.graph.items():
45874627
write_str_bare(buf, mod_id)
45884628
state.write(buf)
4629+
write_int_bare(buf, len(self.missing_modules))
4630+
for module in self.missing_modules:
4631+
write_str_bare(buf, module)
4632+
write_int_bare(buf, len(self.from_cache))
4633+
for module in self.from_cache:
4634+
write_str_bare(buf, module)

mypy/build_worker/worker.py

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,12 @@ def main(argv: list[str]) -> None:
112112

113113

114114
def serve(server: IPCServer, ctx: ServerContext) -> None:
115+
"""Main server loop of the worker.
116+
117+
Receive initial state from the coordinator, then process each
118+
SCC checking request and reply to client (coordinator). See module
119+
docstring for more details on the protocol.
120+
"""
115121
sources = SourcesDataMessage.read(receive(server)).sources
116122
manager = setup_worker_manager(sources, ctx)
117123
if manager is None:
@@ -130,13 +136,18 @@ def serve(server: IPCServer, ctx: ServerContext) -> None:
130136
gc.unfreeze()
131137
gc.enable()
132138
for id in graph:
133-
manager.import_map[id] = set(graph[id].dependencies + graph[id].suppressed)
139+
manager.import_map[id] = graph[id].dependencies_set
140+
# Ignore errors during local graph loading to check that receiving
141+
# early errors from coordinator works correctly.
142+
manager.errors.reset()
134143

135144
# Notify worker we are done loading graph.
136145
send(server, AckMessage())
137146

138147
# Compare worker graph and coordinator, with parallel parser we will only use the latter.
139-
coordinator_graph = GraphMessage.read(receive(server), manager).graph
148+
graph_data = GraphMessage.read(receive(server), manager)
149+
assert set(manager.missing_modules) == graph_data.missing_modules
150+
coordinator_graph = graph_data.graph
140151
assert coordinator_graph.keys() == graph.keys()
141152
for id in graph:
142153
assert graph[id].dependencies_set == coordinator_graph[id].dependencies_set
@@ -150,14 +161,29 @@ def serve(server: IPCServer, ctx: ServerContext) -> None:
150161
# Notify coordinator we are ready to process SCCs.
151162
send(server, AckMessage())
152163
while True:
153-
scc_id = SccRequestMessage.read(receive(server)).scc_id
164+
scc_message = SccRequestMessage.read(receive(server))
165+
scc_id = scc_message.scc_id
154166
if scc_id is None:
155167
manager.dump_stats()
156168
break
157169
scc = manager.scc_by_id[scc_id]
158170
t0 = time.time()
159171
try:
160-
result = process_stale_scc(graph, scc, manager)
172+
for id in scc.mod_ids:
173+
state = graph[id]
174+
# Extra if below is needed only because we are using local graph.
175+
# TODO: clone options when switching to coordinator graph.
176+
if state.tree is None:
177+
# Parse early to get errors related data, such as ignored
178+
# and skipped lines before replaying the errors.
179+
state.parse_file()
180+
else:
181+
state.setup_errors()
182+
if id in scc_message.import_errors:
183+
manager.errors.set_file(state.xpath, id, state.options)
184+
for err_info in scc_message.import_errors[id]:
185+
manager.errors.add_error_info(err_info)
186+
result = process_stale_scc(graph, scc, manager, from_cache=graph_data.from_cache)
161187
# We must commit after each SCC, otherwise we break --sqlite-cache.
162188
manager.metastore.commit()
163189
except CompileError as blocker:

mypy/errorcodes.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -284,11 +284,6 @@ def __hash__(self) -> int:
284284
# Syntax errors are often blocking.
285285
SYNTAX: Final = ErrorCode("syntax", "Report syntax errors", "General")
286286

287-
# This is an internal marker code for a whole-file ignore. It is not intended to
288-
# be user-visible.
289-
FILE: Final = ErrorCode("file", "Internal marker for a whole file being ignored", "General")
290-
del error_codes[FILE.code]
291-
292287
# This is a catch-all for remaining uncategorized errors.
293288
MISC: Final = ErrorCode("misc", "Miscellaneous other checks", "General")
294289

0 commit comments

Comments
 (0)