-
Notifications
You must be signed in to change notification settings - Fork 333
fix(visualserver): contain visual worker failures #1347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
sufubao
wants to merge
1
commit into
ModelTC:main
Choose a base branch
from
sufubao:backport-visual-worker-failures
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| import zmq | ||
| import zmq.asyncio | ||
| import asyncio | ||
| import concurrent.futures | ||
| import uvloop | ||
| import rpyc | ||
| import socket | ||
|
|
@@ -27,6 +28,40 @@ | |
| logger = init_logger(__name__) | ||
|
|
||
|
|
||
| class VisualInferResult: | ||
| """Per-image visual inference outcome. | ||
|
|
||
| Combines completion signaling with success/failure status so the manager can | ||
| distinguish "embed is in cache, OK to forward" from "ViT failed, abort the | ||
| request". Workers call ``mark_success`` / ``mark_failure`` (the manager | ||
| receives the call via the RPyC netref) and the manager waits + inspects | ||
| ``success`` after ``event.wait()`` returns. | ||
|
|
||
| Before this object was introduced, workers only signaled a bare | ||
| ``threading.Event`` on both success and failure, so the manager forwarded | ||
| failed requests to the router with missing embeddings (2026-05-09 incident). | ||
| """ | ||
|
|
||
| __slots__ = ("event", "success", "error") | ||
|
|
||
| def __init__(self): | ||
| self.event = threading.Event() | ||
| self.success = False | ||
| self.error: str = "" | ||
|
|
||
| def mark_success(self): | ||
| self.success = True | ||
| self.event.set() | ||
|
|
||
| def mark_failure(self, error: str = ""): | ||
| # success stays False; record the reason so manager logs are useful. | ||
| self.error = error or self.error or "unknown" | ||
| self.event.set() | ||
|
|
||
| def wait(self, timeout): | ||
| return self.event.wait(timeout) | ||
|
|
||
|
|
||
| class VisualManager: | ||
| def __init__( | ||
| self, | ||
|
|
@@ -48,8 +83,24 @@ def __init__( | |
|
|
||
| self.zmq_recv_socket = context.socket(zmq.PULL) | ||
| self.zmq_recv_socket.bind(f"{args.zmq_mode}127.0.0.1:{args.visual_port}") | ||
| self.cache_client = rpyc.connect("localhost", args.cache_port, config={"allow_pickle": True}) | ||
| # sync_request_timeout 让阻塞的 RPyC 调用 (get_items_embed/set_items_embed) 从 socket | ||
| # 层真正抛 TimeoutError, 避免泄漏 default executor 线程导致 net-io 饿死。 | ||
| cache_rpyc_timeout = max( | ||
| int(getattr(args, "visual_get_items_embed_timeout", 0) or 0), | ||
| int(getattr(args, "visual_set_items_embed_timeout", 0) or 0), | ||
| 10, | ||
| ) | ||
| self.cache_client = rpyc.connect( | ||
| "localhost", | ||
| args.cache_port, | ||
| config={"allow_pickle": True, "sync_request_timeout": cache_rpyc_timeout}, | ||
| ) | ||
| self.cache_client._channel.stream.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) | ||
| # 专用 executor: 同步 cache RPC 都走这个池, 与 asyncio default executor 隔离, | ||
| # 即便某次 cache 调用 hang (sync_request_timeout 兜底前) 也不会饿死 zmq recv_pyobj。 | ||
| self._cache_executor = concurrent.futures.ThreadPoolExecutor( | ||
| max_workers=2, thread_name_prefix="visual_cache_rpc" | ||
| ) | ||
| self.model_weightdir = args.model_dir | ||
| self.vit_dp = args.visual_dp | ||
| self.vit_tp = args.visual_tp | ||
|
|
@@ -60,12 +111,10 @@ def __init__( | |
| self.lock = asyncio.Lock() | ||
|
|
||
| async def wait_to_model_ready(self): | ||
|
|
||
| self.model_rpcs: List[List[VisualModelRpcClient]] = [[] for _ in range(self.vit_dp)] | ||
| self.vit_attn_backend = init_vit_att_backend(index=0) | ||
| for dp_rank_id in range(self.vit_dp): | ||
| for tp_rank_id in range(self.vit_tp): | ||
|
|
||
| rpc_model = await start_model_process() | ||
| self.model_rpcs[dp_rank_id].append(rpc_model) | ||
|
|
||
|
|
@@ -92,6 +141,12 @@ async def wait_to_model_ready(self): | |
| return | ||
|
|
||
| def get_need_infer_images(self, group_req_indexes: GroupReqIndexes) -> List[ImageItem]: | ||
| """同步路径: 检查 shm req 是否 aborted, 必要时调用 embed cache 询问哪些 image 已就绪。 | ||
|
|
||
| ``cache_client.root.get_items_embed`` 是同步 RPyC 调用; 调用方需要包一层 | ||
| ``asyncio.to_thread`` 并加超时, 避免 embed cache 卡死时阻塞 asyncio 事件循环 | ||
| (2026-05-09 incident)。 | ||
| """ | ||
|
Comment on lines
143
to
+149
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To ensure thread safety when calling with self._cache_lock:
ready_image = obtain(self.cache_client.root.get_items_embed(img_uuids)) |
||
| shm_req = self.shm_req_manager.get_req_obj_by_index(group_req_indexes.shm_req_indexes[0]) | ||
| is_aborted = shm_req.is_aborted | ||
| disable_prompt_cache = shm_req.sample_params.disable_prompt_cache | ||
|
|
@@ -122,15 +177,37 @@ def get_need_infer_images(self, group_req_indexes: GroupReqIndexes) -> List[Imag | |
| return images_need_infer | ||
|
|
||
| async def handle_group_indexes(self, group_req_indexes: GroupReqIndexes): | ||
| images_need_infer = self.get_need_infer_images(group_req_indexes) | ||
| # 把 get_need_infer_images 也放在 try 内, 并通过 dedicated executor + wait_for 加超时, | ||
| # 因为内部包含同步 RPyC 调用 (cache_client.root.get_items_embed); embed cache 卡死时 | ||
| # 不应阻塞 asyncio 事件循环或绕过 abort 路径 (2026-05-09 incident)。 | ||
| # 关键: 用 _cache_executor 而不是 default executor, 否则卡死的 cache 调用会占满 | ||
| # default executor 进而饿死 loop_for_netio_req 里的 zmq_recv_socket.recv_pyobj。 | ||
| cache_timeout = max(int(getattr(self.args, "visual_get_items_embed_timeout", 0) or 0), 0) or 10 | ||
| loop = asyncio.get_running_loop() | ||
| try: | ||
| images_need_infer = await asyncio.wait_for( | ||
| loop.run_in_executor(self._cache_executor, self.get_need_infer_images, group_req_indexes), | ||
| timeout=cache_timeout, | ||
| ) | ||
|
|
||
| if len(images_need_infer) == 0: | ||
| self.send_to_next_module.send_pyobj(group_req_indexes, protocol=pickle.HIGHEST_PROTOCOL) | ||
| return | ||
|
|
||
| if len(images_need_infer) == 0: | ||
| self.send_to_next_module.send_pyobj(group_req_indexes, protocol=pickle.HIGHEST_PROTOCOL) | ||
| return | ||
| else: | ||
| await self.handle_images(images_need_infer) | ||
| self.send_to_next_module.send_pyobj(group_req_indexes, protocol=pickle.HIGHEST_PROTOCOL) | ||
| return | ||
| except Exception as e: | ||
| # visual 推理失败 (例如 worker 异常 / event 等待超时 / get_items_embed 卡住), | ||
| # 把请求标记为 aborted 再转发, 由下游 router 正常走 abort 释放路径。 | ||
| logger.exception( | ||
| f"handle_group_indexes failed, marking group_req_id={group_req_indexes.group_req_id} aborted: {e}" | ||
| ) | ||
| for shm_req_index in group_req_indexes.shm_req_indexes: | ||
| shm_req = self.shm_req_manager.get_req_obj_by_index(shm_req_index) | ||
| shm_req.is_aborted = True | ||
| self.shm_req_manager.put_back_req_obj(shm_req) | ||
|
|
||
| self.send_to_next_module.send_pyobj(group_req_indexes, protocol=pickle.HIGHEST_PROTOCOL) | ||
| return | ||
|
|
||
| async def handle_images(self, images_need_infer: List[ImageItem]): | ||
| if not hasattr(self, "cur_dp_index"): | ||
|
|
@@ -140,14 +217,14 @@ async def handle_images(self, images_need_infer: List[ImageItem]): | |
| for image in images_need_infer: | ||
| self.cur_dp_index += 1 | ||
| select_dp = self.cur_dp_index % self.vit_dp | ||
| dp_to_handle_images[select_dp].append((image, threading.Event())) | ||
| dp_to_handle_images[select_dp].append((image, VisualInferResult())) | ||
|
|
||
| taskes = [] | ||
| for dp_index in range(self.vit_dp): | ||
| _images = dp_to_handle_images[dp_index] | ||
| if _images: | ||
| taskes.append( | ||
| self.infer_images(dp_index, images=[e[0] for e in _images], events=[e[1] for e in _images]) | ||
| self.infer_images(dp_index, images=[e[0] for e in _images], results=[e[1] for e in _images]) | ||
| ) | ||
|
|
||
| async with self.lock: | ||
|
|
@@ -157,17 +234,39 @@ async def handle_images(self, images_need_infer: List[ImageItem]): | |
| logger.exception(str(e)) | ||
| raise e | ||
|
|
||
| # 等待推理通知已经 ok | ||
| # 等待每张图片各自的完成事件 + 检查成功状态。 | ||
| # event.wait() 必须有 timeout, 否则 ViT worker 异常退出 / cache 卡死时, 这里 | ||
| # 永远不会返回, 同时被 asyncio.to_thread 占用的 default executor 线程也会被耗尽 | ||
| # (2026-05-09 incident)。 | ||
| # | ||
| # 关键: 即使发现有失败 image, 也要先把同 batch 中其他 image 都等到 (success 或 timeout), | ||
| # 再统一抛出。否则当一张 preprocess_failed 的图片先触发 mark_failure 时, 我们立刻 | ||
| # raise → 上层走 abort → 下游释放 multimodal cache id, 但 store_worker 此刻还在 | ||
| # 给同 batch 中成功的 image 写 embedding, 造成写到已释放槽位的竞态。 | ||
| wait_timeout = max(int(getattr(self.args, "visual_infer_timeout", 0) or 0), 0) or 120 | ||
| errors: List[str] = [] | ||
| for dp_index in range(self.vit_dp): | ||
| _images = dp_to_handle_images[dp_index] | ||
| if _images: | ||
| await asyncio.to_thread(_images[-1][1].wait) | ||
| for img, result in _images: | ||
| ok = await asyncio.to_thread(result.wait, wait_timeout) | ||
| if not ok: | ||
| errors.append( | ||
| f"timeout dp={dp_index} uuid={img.uuid} " | ||
| f"md5={getattr(img, 'md5', None)} timeout={wait_timeout}s" | ||
| ) | ||
| continue | ||
| if not result.success: | ||
| errors.append( | ||
| f"failed dp={dp_index} uuid={img.uuid} " f"md5={getattr(img, 'md5', None)} error={result.error}" | ||
| ) | ||
| if errors: | ||
| raise RuntimeError("visual infer batch had failures: " + "; ".join(errors)) | ||
| return | ||
|
|
||
| async def infer_images(self, dp_index: int, images, events): | ||
| async def infer_images(self, dp_index: int, images, results): | ||
| taskes = [] | ||
| for vit_tp_rank in range(self.vit_tp): | ||
| task = self.model_rpcs[dp_index][vit_tp_rank].run_task(images, events) | ||
| task = self.model_rpcs[dp_index][vit_tp_rank].run_task(images, results) | ||
| taskes.append(task) | ||
| await asyncio.gather(*taskes) | ||
|
|
||
|
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RPyC connections are not thread-safe. Since
self.cache_clientis called from threads inself._cache_executor(which hasmax_workers=2), concurrent requests can callget_items_embedconcurrently, leading to protocol corruption or connection hangs. We should initialize a lock to serialize all RPyC calls onself.cache_client.