From 50d6a0afe0105143df1df7479cf7edeada673c8b Mon Sep 17 00:00:00 2001 From: sufubao Date: Mon, 11 May 2026 18:25:15 +0800 Subject: [PATCH] fix(visualserver): contain visual worker failures Backport visualserver failure containment from upstream/qwen35. Keep worker loops alive after per-batch failures, propagate per-image failure status back to the manager, bound visual waits and cache RPCs, harden proxy/visual paths, and expose worker death through watchdog sentinels checked by /health. Adapted for main: dropped the qwen35 image/dependency update and qwen35-only visual token-budget flags. (cherry picked from commit 2b19512839683afca1a5092719937a10786f4532) --- lightllm/models/qwen3_vl/qwen3_visual.py | 30 +- lightllm/server/api_cli.py | 34 ++ lightllm/server/core/objs/start_args_type.py | 3 + lightllm/server/visualserver/manager.py | 133 +++++- .../visualserver/model_infer/model_rpc.py | 398 +++++++++++++++++- .../model_infer/model_rpc_client.py | 44 +- lightllm/server/visualserver/objs.py | 8 + lightllm/server/visualserver/proxy_manager.py | 41 +- .../visualserver/visual_only_manager.py | 4 +- lightllm/utils/health_check.py | 34 +- 10 files changed, 662 insertions(+), 67 deletions(-) diff --git a/lightllm/models/qwen3_vl/qwen3_visual.py b/lightllm/models/qwen3_vl/qwen3_visual.py index 151ea475e5..25b8bb022f 100644 --- a/lightllm/models/qwen3_vl/qwen3_visual.py +++ b/lightllm/models/qwen3_vl/qwen3_visual.py @@ -376,17 +376,27 @@ def encode(self, images: List[ImageItem]): valid_id = 0 img_grids = [] uuids = [] + # 任意一张图片解码 / preprocess 失败都视为请求级失败: + # 跳过这张图但保留 batch 中其他图片, 否则一张截断图就会让整个 ViT worker 崩溃。 + failed_images = [] for i, img in enumerate(images): - if isinstance(img, ImageItem): - uuids.append(img.uuid) + if not isinstance(img, ImageItem): + raise Exception("Unsupported input types: {} for {}".format(type(img), img)) + try: image_data = read_shm(get_shm_name_data(img.uuid)) image_data = Image.open(BytesIO(image_data)) pixel_values, image_grid_thw = self.processor.preprocess(image_data) + except Exception as e: + logger.exception( + f"image preprocess failed, treating as per-request error: " + f"uuid={img.uuid} md5={getattr(img, 'md5', None)}: {e}" + ) + failed_images.append(img) + continue - img_tensors.append(pixel_values) - img_grids.append(image_grid_thw) - else: - raise Exception("Unsupported input types: {} for {}".format(type(img), img)) + uuids.append(img.uuid) + img_tensors.append(pixel_values) + img_grids.append(image_grid_thw) # must devide merge_length cur_num = img_tensors[-1].shape[0] // (self.spatial_merge_size ** 2) @@ -394,8 +404,14 @@ def encode(self, images: List[ImageItem]): valid_ids.append([valid_id, valid_id + cur_num]) valid_id += cur_num + # 失败的图片直接打通知 event, 释放 manager 端的等待和 shm req 资源。 + # 这里通过 attribute 标记, 由调用方 (model_rpc) 负责设置 event 并释放信号量。 + for img in failed_images: + img.preprocess_failed = True + if len(img_tensors) <= 0: - return None + # 整 batch 都失败: 返回空结果由 _infer_worker 统一处理 preprocess_failed 的 image。 + return None, [], [] imgs = torch.cat(img_tensors, dim=0) grid_thw = torch.cat(img_grids, dim=0) diff --git a/lightllm/server/api_cli.py b/lightllm/server/api_cli.py index 7e40421140..08d5535e57 100644 --- a/lightllm/server/api_cli.py +++ b/lightllm/server/api_cli.py @@ -483,6 +483,40 @@ def make_argument_parser() -> argparse.ArgumentParser: parser.add_argument( "--visual_infer_batch_size", type=int, default=None, help="number of images to process in each inference batch" ) + parser.add_argument( + "--visual_infer_timeout", + type=int, + default=120, + help=""" + Bound on how long VisualManager.handle_images() waits on the per-batch event + (in seconds). Prevents a stalled or crashed ViT worker from blocking visual + requests forever and exhausting the default asyncio executor (see the + 2026-05-09 incident). On timeout the request is marked aborted and forwarded + to the next module for normal abort handling. Set to 0 to use the default. + """, + ) + parser.add_argument( + "--visual_set_items_embed_timeout", + type=int, + default=30, + help=""" + Bound (seconds) on the synchronous RPyC call to embed_cache_client.set_items_embed + inside the visualserver store worker. A hung cache server used to silently block + the store worker forever (2026-05-09 incident); on timeout the batch is reported + back as failed so manager triggers per-request abort. Set to 0 to use the default. + """, + ) + parser.add_argument( + "--visual_get_items_embed_timeout", + type=int, + default=10, + help=""" + Bound (seconds) on the synchronous get_items_embed cache RPC issued from + VisualManager.handle_group_indexes before deciding which images need ViT. Wraps + the call in asyncio.to_thread + wait_for so a hung cache cannot block the visual + event loop or skip the abort path (2026-05-09 incident). + """, + ) parser.add_argument( "--visual_send_batch_size", type=int, diff --git a/lightllm/server/core/objs/start_args_type.py b/lightllm/server/core/objs/start_args_type.py index e7f35780a4..e30fa759d4 100644 --- a/lightllm/server/core/objs/start_args_type.py +++ b/lightllm/server/core/objs/start_args_type.py @@ -109,6 +109,9 @@ class StartArgs: push_interval: int = field(default=10) visual_node_id: int = field(default=None) visual_infer_batch_size: int = field(default=None) + visual_infer_timeout: int = field(default=120) + visual_set_items_embed_timeout: int = field(default=30) + visual_get_items_embed_timeout: int = field(default=10) visual_send_batch_size: int = field(default=1) visual_gpu_ids: List[int] = field(default_factory=lambda: [0]) visual_tp: int = field(default=1) diff --git a/lightllm/server/visualserver/manager.py b/lightllm/server/visualserver/manager.py index 1dffdaf681..e67f84f066 100644 --- a/lightllm/server/visualserver/manager.py +++ b/lightllm/server/visualserver/manager.py @@ -1,6 +1,7 @@ import zmq import zmq.asyncio import asyncio +import concurrent.futures import uvloop import rpyc import socket @@ -27,6 +28,40 @@ logger = init_logger(__name__) +class VisualInferResult: + """Per-image visual inference outcome. + + Combines completion signaling with success/failure status so the manager can + distinguish "embed is in cache, OK to forward" from "ViT failed, abort the + request". Workers call ``mark_success`` / ``mark_failure`` (the manager + receives the call via the RPyC netref) and the manager waits + inspects + ``success`` after ``event.wait()`` returns. + + Before this object was introduced, workers only signaled a bare + ``threading.Event`` on both success and failure, so the manager forwarded + failed requests to the router with missing embeddings (2026-05-09 incident). + """ + + __slots__ = ("event", "success", "error") + + def __init__(self): + self.event = threading.Event() + self.success = False + self.error: str = "" + + def mark_success(self): + self.success = True + self.event.set() + + def mark_failure(self, error: str = ""): + # success stays False; record the reason so manager logs are useful. + self.error = error or self.error or "unknown" + self.event.set() + + def wait(self, timeout): + return self.event.wait(timeout) + + class VisualManager: def __init__( self, @@ -48,8 +83,24 @@ def __init__( self.zmq_recv_socket = context.socket(zmq.PULL) self.zmq_recv_socket.bind(f"{args.zmq_mode}127.0.0.1:{args.visual_port}") - self.cache_client = rpyc.connect("localhost", args.cache_port, config={"allow_pickle": True}) + # sync_request_timeout 让阻塞的 RPyC 调用 (get_items_embed/set_items_embed) 从 socket + # 层真正抛 TimeoutError, 避免泄漏 default executor 线程导致 net-io 饿死。 + cache_rpyc_timeout = max( + int(getattr(args, "visual_get_items_embed_timeout", 0) or 0), + int(getattr(args, "visual_set_items_embed_timeout", 0) or 0), + 10, + ) + self.cache_client = rpyc.connect( + "localhost", + args.cache_port, + config={"allow_pickle": True, "sync_request_timeout": cache_rpyc_timeout}, + ) self.cache_client._channel.stream.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + # 专用 executor: 同步 cache RPC 都走这个池, 与 asyncio default executor 隔离, + # 即便某次 cache 调用 hang (sync_request_timeout 兜底前) 也不会饿死 zmq recv_pyobj。 + self._cache_executor = concurrent.futures.ThreadPoolExecutor( + max_workers=2, thread_name_prefix="visual_cache_rpc" + ) self.model_weightdir = args.model_dir self.vit_dp = args.visual_dp self.vit_tp = args.visual_tp @@ -60,12 +111,10 @@ def __init__( self.lock = asyncio.Lock() async def wait_to_model_ready(self): - self.model_rpcs: List[List[VisualModelRpcClient]] = [[] for _ in range(self.vit_dp)] self.vit_attn_backend = init_vit_att_backend(index=0) for dp_rank_id in range(self.vit_dp): for tp_rank_id in range(self.vit_tp): - rpc_model = await start_model_process() self.model_rpcs[dp_rank_id].append(rpc_model) @@ -92,6 +141,12 @@ async def wait_to_model_ready(self): return def get_need_infer_images(self, group_req_indexes: GroupReqIndexes) -> List[ImageItem]: + """同步路径: 检查 shm req 是否 aborted, 必要时调用 embed cache 询问哪些 image 已就绪。 + + ``cache_client.root.get_items_embed`` 是同步 RPyC 调用; 调用方需要包一层 + ``asyncio.to_thread`` 并加超时, 避免 embed cache 卡死时阻塞 asyncio 事件循环 + (2026-05-09 incident)。 + """ shm_req = self.shm_req_manager.get_req_obj_by_index(group_req_indexes.shm_req_indexes[0]) is_aborted = shm_req.is_aborted disable_prompt_cache = shm_req.sample_params.disable_prompt_cache @@ -122,15 +177,37 @@ def get_need_infer_images(self, group_req_indexes: GroupReqIndexes) -> List[Imag return images_need_infer async def handle_group_indexes(self, group_req_indexes: GroupReqIndexes): - images_need_infer = self.get_need_infer_images(group_req_indexes) + # 把 get_need_infer_images 也放在 try 内, 并通过 dedicated executor + wait_for 加超时, + # 因为内部包含同步 RPyC 调用 (cache_client.root.get_items_embed); embed cache 卡死时 + # 不应阻塞 asyncio 事件循环或绕过 abort 路径 (2026-05-09 incident)。 + # 关键: 用 _cache_executor 而不是 default executor, 否则卡死的 cache 调用会占满 + # default executor 进而饿死 loop_for_netio_req 里的 zmq_recv_socket.recv_pyobj。 + cache_timeout = max(int(getattr(self.args, "visual_get_items_embed_timeout", 0) or 0), 0) or 10 + loop = asyncio.get_running_loop() + try: + images_need_infer = await asyncio.wait_for( + loop.run_in_executor(self._cache_executor, self.get_need_infer_images, group_req_indexes), + timeout=cache_timeout, + ) + + if len(images_need_infer) == 0: + self.send_to_next_module.send_pyobj(group_req_indexes, protocol=pickle.HIGHEST_PROTOCOL) + return - if len(images_need_infer) == 0: - self.send_to_next_module.send_pyobj(group_req_indexes, protocol=pickle.HIGHEST_PROTOCOL) - return - else: await self.handle_images(images_need_infer) - self.send_to_next_module.send_pyobj(group_req_indexes, protocol=pickle.HIGHEST_PROTOCOL) - return + except Exception as e: + # visual 推理失败 (例如 worker 异常 / event 等待超时 / get_items_embed 卡住), + # 把请求标记为 aborted 再转发, 由下游 router 正常走 abort 释放路径。 + logger.exception( + f"handle_group_indexes failed, marking group_req_id={group_req_indexes.group_req_id} aborted: {e}" + ) + for shm_req_index in group_req_indexes.shm_req_indexes: + shm_req = self.shm_req_manager.get_req_obj_by_index(shm_req_index) + shm_req.is_aborted = True + self.shm_req_manager.put_back_req_obj(shm_req) + + self.send_to_next_module.send_pyobj(group_req_indexes, protocol=pickle.HIGHEST_PROTOCOL) + return async def handle_images(self, images_need_infer: List[ImageItem]): if not hasattr(self, "cur_dp_index"): @@ -140,14 +217,14 @@ async def handle_images(self, images_need_infer: List[ImageItem]): for image in images_need_infer: self.cur_dp_index += 1 select_dp = self.cur_dp_index % self.vit_dp - dp_to_handle_images[select_dp].append((image, threading.Event())) + dp_to_handle_images[select_dp].append((image, VisualInferResult())) taskes = [] for dp_index in range(self.vit_dp): _images = dp_to_handle_images[dp_index] if _images: taskes.append( - self.infer_images(dp_index, images=[e[0] for e in _images], events=[e[1] for e in _images]) + self.infer_images(dp_index, images=[e[0] for e in _images], results=[e[1] for e in _images]) ) async with self.lock: @@ -157,17 +234,39 @@ async def handle_images(self, images_need_infer: List[ImageItem]): logger.exception(str(e)) raise e - # 等待推理通知已经 ok + # 等待每张图片各自的完成事件 + 检查成功状态。 + # event.wait() 必须有 timeout, 否则 ViT worker 异常退出 / cache 卡死时, 这里 + # 永远不会返回, 同时被 asyncio.to_thread 占用的 default executor 线程也会被耗尽 + # (2026-05-09 incident)。 + # + # 关键: 即使发现有失败 image, 也要先把同 batch 中其他 image 都等到 (success 或 timeout), + # 再统一抛出。否则当一张 preprocess_failed 的图片先触发 mark_failure 时, 我们立刻 + # raise → 上层走 abort → 下游释放 multimodal cache id, 但 store_worker 此刻还在 + # 给同 batch 中成功的 image 写 embedding, 造成写到已释放槽位的竞态。 + wait_timeout = max(int(getattr(self.args, "visual_infer_timeout", 0) or 0), 0) or 120 + errors: List[str] = [] for dp_index in range(self.vit_dp): _images = dp_to_handle_images[dp_index] - if _images: - await asyncio.to_thread(_images[-1][1].wait) + for img, result in _images: + ok = await asyncio.to_thread(result.wait, wait_timeout) + if not ok: + errors.append( + f"timeout dp={dp_index} uuid={img.uuid} " + f"md5={getattr(img, 'md5', None)} timeout={wait_timeout}s" + ) + continue + if not result.success: + errors.append( + f"failed dp={dp_index} uuid={img.uuid} " f"md5={getattr(img, 'md5', None)} error={result.error}" + ) + if errors: + raise RuntimeError("visual infer batch had failures: " + "; ".join(errors)) return - async def infer_images(self, dp_index: int, images, events): + async def infer_images(self, dp_index: int, images, results): taskes = [] for vit_tp_rank in range(self.vit_tp): - task = self.model_rpcs[dp_index][vit_tp_rank].run_task(images, events) + task = self.model_rpcs[dp_index][vit_tp_rank].run_task(images, results) taskes.append(task) await asyncio.gather(*taskes) diff --git a/lightllm/server/visualserver/model_infer/model_rpc.py b/lightllm/server/visualserver/model_infer/model_rpc.py index 50bc12fd23..9b47579c53 100644 --- a/lightllm/server/visualserver/model_infer/model_rpc.py +++ b/lightllm/server/visualserver/model_infer/model_rpc.py @@ -1,3 +1,4 @@ +import os import rpyc import torch import socket @@ -115,7 +116,17 @@ def exposed_init_model(self, kvargs): self.model.load_model(weight_dir) self.model = self.model.cuda() if not self.is_visual_only_mode: - self.cache_client = rpyc.connect("localhost", self.cache_port, config={"allow_pickle": True}) + # sync_request_timeout 让阻塞的 RPyC 调用从 socket 层真正抛 TimeoutError, + # 避免我们手写的 _call_with_timeout 留下"永远 hang 在 sock.recv 上"的孤儿线程 + # (2026-05-09 incident, AC#8)。 + set_items_embed_timeout = ( + float(getattr(get_env_start_args(), "visual_set_items_embed_timeout", 0) or 0) or 30.0 + ) + self.cache_client = rpyc.connect( + "localhost", + self.cache_port, + config={"allow_pickle": True, "sync_request_timeout": set_items_embed_timeout}, + ) self.cache_client._channel.stream.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self.cpu_embed_cache_client = CpuEmbedCacheClient(create_meta_data=False, init_shm_data=False) else: @@ -143,11 +154,16 @@ def exposed_init_model(self, kvargs): set_random_seed(2147483647) return - def exposed_run_task(self, images: List["ImageItem"], ref_event_list: List[threading.Event]): + def exposed_run_task(self, images: List["ImageItem"], ref_result_list): try: + # 长驻 worker 已挂掉时直接抛错, 让 manager 端走 abort 路径, + # 避免请求被 enqueue 后永远卡在 result.wait() 上 (2026-05-09 incident)。 + self._assert_workers_alive() images = obtain(images) for i in range(len(images)): - images[i].event = ref_event_list[i] + # ref_result_list[i] 是 manager 端 VisualInferResult 的 RPyC netref; + # 调用其 mark_success / mark_failure 会回到 manager 进程修改状态并触发 event。 + images[i].result = ref_result_list[i] images[i].start_time = time.time() self.infer_queue.put(images[i]) @@ -156,6 +172,17 @@ def exposed_run_task(self, images: List["ImageItem"], ref_event_list: List[threa raise e return + def _assert_workers_alive(self): + infer_alive = getattr(self, "_infer_thread", None) is not None and self._infer_thread.is_alive() + store_alive = getattr(self, "_store_thread", None) is not None and self._store_thread.is_alive() + if not (infer_alive and store_alive): + msg = ( + f"visual worker dead: infer_alive={infer_alive} store_alive={store_alive} " + f"dp={getattr(self, 'dp_rank_id', '?')} tp={getattr(self, 'tp_rank_id', '?')}" + ) + logger.error(msg) + raise RuntimeError(msg) + def _log_latency(self, image: ImageItem, stage: str): latency = time.time() - image.start_time if latency > 0.02: @@ -173,6 +200,11 @@ def _init_taskes(self): # 限制并发, 主要是为了控制内存用量,防止过多造成内存OOM self.sempare = threading.Semaphore(self.infer_max_batch_size * 8) + # 串行化经过 self.cache_client 的同步 RPyC 调用 (set_items_embed 等)。 + # 单线程持有的锁, 防止超时后台 thread 与新一轮调用共用同一条 RPyC 连接, + # 同时把"卡死的 cache 调用"维持在最多 1 个 leaked thread 内 (而非每次 timeout 都新增一个)。 + self._cache_call_lock = threading.Lock() + # 用于同步各个推理tp每次拿到一样的image数量建立的gloo通信组 self.gloo_group = dist.new_group(ranks=list(range(self.vit_tp)), backend="gloo") @@ -182,8 +214,159 @@ def _init_taskes(self): self._store_thread = threading.Thread(target=self._store_worker, daemon=True) self._store_thread.start() + + # 周期性 watchdog: 不依赖新 run_task 触发, 即使一段时间没流量也能在第一时间发现 + # worker 意外死亡并打错误日志, 满足 issue.md 验收 9 (watchdog / health signal)。 + self._workers_dead_reported = False + # Stale sentinel cleanup: glob ALL visual-unhealthy sentinels on the host + # (not just the current unique_name) and remove any whose recorded PID is + # no longer alive. Handles both: + # * crash + same-name restart (old code already covered this) + # * crash + different-unique-name restart (e.g. port change, run mode change) + # where the old sentinel would otherwise linger forever and falsely trip + # external glob-based probes. + self._sweep_stale_sentinels() + self._watchdog_thread = threading.Thread(target=self._watchdog_loop, daemon=True, name="visual_watchdog") + self._watchdog_thread.start() return + @staticmethod + def _parse_env_bool(name: str) -> bool: + """Truthy env-var parser that treats "0"/"false"/"no"/"off" as false. + + ``bool(os.getenv(name))`` is wrong because the empty string is the only + falsy str — "0", "false", "False" would all evaluate true. + """ + raw = os.getenv(name) + if raw is None: + return False + return raw.strip().lower() not in ("", "0", "false", "no", "off") + + _SENTINEL_PREFIX = "/tmp/lightllm_visual_unhealthy_" + + def _watchdog_unhealthy_path(self) -> str: + """Sentinel file path consumed by /health and external probes. + + Includes the unique server name so multiple LightLLM instances on the same + host don't collide on /tmp paths. Format must stay in sync with + ``lightllm.utils.health_check.scan_visual_unhealthy_sentinels``. + """ + from lightllm.utils.envs_utils import get_unique_server_name + + return f"{self._SENTINEL_PREFIX}{get_unique_server_name()}_dp{self.dp_rank_id}_tp{self.tp_rank_id}" + + def _sweep_stale_sentinels(self): + """Remove any visual-unhealthy sentinels whose writer PID is dead. + + Glob-based external probes need this — without it, a process crash leaves + the file behind forever and probes mark a freshly-healthy service unhealthy. + We identify staleness by checking the embedded PID with ``os.kill(pid, 0)``. + """ + import glob + + for path in glob.glob(self._SENTINEL_PREFIX + "*"): + try: + with open(path) as fh: + content = fh.read() + pid = None + for tok in content.split(): + if tok.startswith("pid="): + try: + pid = int(tok.split("=", 1)[1]) + except ValueError: + pid = None + break + stale = False + if pid is None: + # Pre-PID file format from an older crash — treat as stale. + stale = True + else: + try: + os.kill(pid, 0) + except ProcessLookupError: + stale = True + except PermissionError: + # PID belongs to another user — assume still alive, don't remove. + stale = False + if stale: + os.remove(path) + logger.info(f"[watchdog] removed stale sentinel from dead PID: {path}") + except FileNotFoundError: + pass + except Exception: + logger.exception(f"[watchdog] sweep failed for {path}; continuing") + + def _watchdog_loop(self): + """Periodic ViT-worker liveness check. + + On detected death we: + 1. Log ERROR once (with queue sizes for diagnostics). + 2. Write a sentinel file at ``_watchdog_unhealthy_path()`` — operators / + external probes can `test -e` this to flag the visualserver unhealthy + without needing to talk to the RPC server. + 3. Optionally ``os._exit(1)`` if ``LIGHTLLM_VISUAL_EXIT_ON_WORKER_DEATH`` is + set, so a process supervisor (systemd / k8s / parent_check) can perform + controlled restart. Off by default — letting the process linger preserves + debuggability; opt-in enables auto-recovery. + + Without traffic the previous `_assert_workers_alive()` gate would never fire, + so the watchdog guarantees an active signal regardless of request rate. + """ + exit_on_death = self._parse_env_bool("LIGHTLLM_VISUAL_EXIT_ON_WORKER_DEATH") + while True: + try: + time.sleep(30) + infer_alive = self._infer_thread.is_alive() + store_alive = self._store_thread.is_alive() + queue_size = self.infer_queue.qsize() + store_size = self.store_queue.qsize() + if not (infer_alive and store_alive): + if not self._workers_dead_reported: + logger.error( + f"[watchdog] visual worker dead: " + f"infer_alive={infer_alive} store_alive={store_alive} " + f"dp={self.dp_rank_id} tp={self.tp_rank_id} " + f"infer_queue={queue_size} store_queue={store_size}" + ) + # Sentinel file for external probes / k8s readiness checks. + # Embed the writer PID so a future restart can sweep stale files + # whose PID is no longer alive (see _sweep_stale_sentinels). + try: + with open(self._watchdog_unhealthy_path(), "w") as fh: + fh.write( + f"pid={os.getpid()} " + f"infer_alive={infer_alive} store_alive={store_alive} " + f"dp={self.dp_rank_id} tp={self.tp_rank_id} " + f"at={time.time()}\n" + ) + except Exception: + logger.exception("[watchdog] failed to write sentinel file; continuing") + self._workers_dead_reported = True + if exit_on_death: + logger.error( + "[watchdog] LIGHTLLM_VISUAL_EXIT_ON_WORKER_DEATH set; " + "exiting visual rpc process so supervisor can restart" + ) + os._exit(1) + else: + # Sentinel cleanup on recovery (defensive — worker threads do not actually revive). + if self._workers_dead_reported: + logger.warning("[watchdog] workers recovered from previous dead state") + try: + path = self._watchdog_unhealthy_path() + if os.path.exists(path): + os.remove(path) + except Exception: + logger.exception("[watchdog] sentinel cleanup failed; continuing") + self._workers_dead_reported = False + logger.debug( + f"[watchdog] alive infer={infer_alive} store={store_alive} " + f"infer_queue={queue_size} store_queue={store_size}" + ) + except Exception: + # watchdog 本身不能死掉 + logger.exception("[watchdog] loop iteration raised; continuing") + # @calculate_time(show=True, min_cost_ms=150) @torch.no_grad() def _forward(self, images: List[ImageItem]): @@ -238,9 +421,16 @@ def _get_image_items_from_store_queue(self, max_num: int) -> List[ImageItem]: def _infer_worker(self): """ 任务处理循环: 从队列中取出任务, 执行完成后通知调用者 + + 一个 batch 出错 (例如截断图片导致 PIL 解码失败) 不能让长驻线程退出, + 否则后续所有图片请求都会卡死在 ``image.result.event.wait()`` 上。 + 失败要通过 ``image.result.mark_failure`` 通知 manager, 让其走 abort 路径, + 否则下游 router 会拿到没有 embedding 的请求 (2026-05-09 incident)。 """ torch.cuda.set_device(self.device_id) while True: + images: List[ImageItem] = [] + handed_off = False try: # 从队列获取任务, 阻塞等待 if self.tp_rank_id == 0: @@ -255,19 +445,50 @@ def _infer_worker(self): self._log_latency(image, stage="queue_cost_time") # 执行任务: 调用父类的forward方法处理图像 + # 部分 visual model (qwen3_vl) 会把单张坏图标记为 image.preprocess_failed, + # 这里从 batch 中拆出失败 image 单独标记 mark_failure, 保证 batch 中其他正常图片继续。 all_img_embeds, uuids, valid_ids = self._forward(images) + + successful_images = [img for img in images if not getattr(img, "preprocess_failed", False)] + failed_images = [img for img in images if getattr(img, "preprocess_failed", False)] + # 先把 failed_images 从 local images 里移走再处理, 避免后面 _store 失败时 + # _fail_batch 重复释放它们的 semaphore / 重复 mark_failure。 + images = list(successful_images) + + for image in failed_images: + if self.tp_rank_id == 0: + self._mark_failure(image, "preprocess_failed") + try: + self.sempare.release() + except Exception: + logger.exception("semaphore release for preprocess-failed image raised; continuing") + + if not successful_images: + continue + all_img_embeds = all_img_embeds.to(torch.device("cuda")) + # _store_to_* 内部全量准备好状态再统一入队, 入队完成后才把 ownership 移交给 store_worker。 + # 在入队前抛异常时, images 仍然有效, 外层 except 会通过 _fail_batch 通知所有图片失败。 if self.is_visual_only_mode: - self._store_to_afs(all_img_embeds, valid_ids, images) + self._store_to_afs(all_img_embeds, valid_ids, successful_images) else: - self._store_to_cpu_cache(all_img_embeds, valid_ids, images) + self._store_to_cpu_cache(all_img_embeds, valid_ids, successful_images) + + # 入队成功后, ownership 已经移交给 store_worker。同 batch 中的 failed_images + # 已经在前面处理完, 此处只需要让 finally 不要再处理已经入队的 successful_images。 + handed_off = True + images = [] except Exception as e: - logger.exception(str(e)) - raise e + # handed_off=True 时不会到这里 (上面已经将 images 清空), 但即使到了也不会双重处理。 + if not handed_off: + self._fail_batch(images, stage="_infer_worker", exc=e) + else: + logger.exception(f"_infer_worker post-handoff exception (ignored): {e}") def _store_to_cpu_cache(self, all_img_embeds, valid_ids, images): + """全量准备好 cuda event 后再统一入队, 避免中途失败留下半批已入队的 image。""" for i in range(len(images)): start, end = valid_ids[i] image = images[i] @@ -278,6 +499,7 @@ def _store_to_cpu_cache(self, all_img_embeds, valid_ids, images): cuda_event = torch.cuda.Event() cuda_event.record() image.cuda_event = cuda_event + for image in images: self.store_queue.put(image) def _store_to_afs(self, all_img_embeds, valid_ids, images): @@ -287,16 +509,20 @@ def _store_to_afs(self, all_img_embeds, valid_ids, images): start, end = valid_id gen_embed = all_img_embeds[start:end] image.gen_embed = gen_embed + for image in images: self.store_queue.put(image) def _store_worker(self): """ 任务处理循环: 从队列中取出ImageItem和embed 放入 afs中, 执行完成后通知调用者 + + 与 ``_infer_worker`` 同样需要在异常时不退出, 保证长驻线程持续消费 store_queue。 """ while True: + images: List[ImageItem] = [] try: # 从队列获取任务, 阻塞等待 - images: List[ImageItem] = self._get_image_items_from_store_queue(max_num=self.infer_max_batch_size) + images = self._get_image_items_from_store_queue(max_num=self.infer_max_batch_size) if self.is_visual_only_mode: self._commit_to_afs(images=images) @@ -306,16 +532,21 @@ def _store_worker(self): for _ in images: self.sempare.release() + images = [] + except Exception as e: - logger.exception(str(e)) - raise e + self._fail_batch(images, stage="_store_worker", exc=e) def _commit_to_afs(self, images): if self.tp_rank_id == 0: for image in images: - self.afs_handler.insert(image.md5, image.gen_embed) - self._log_latency(image, stage="store_to_afs") - image.event.set() + try: + self.afs_handler.insert(image.md5, image.gen_embed) + self._log_latency(image, stage="store_to_afs") + self._mark_success(image) + except Exception as e: + logger.exception(f"afs insert failed for md5={image.md5}: {e}") + self._mark_failure(image, f"afs_insert_failed: {e}") self._log_latency(image, stage="set_event") def _commit_to_cpu_cache(self, images): @@ -326,11 +557,148 @@ def _commit_to_cpu_cache(self, images): self._log_latency(image, stage="inference") uuids = [image.uuid for image in images] - self.cache_client.root.set_items_embed(uuids) + timeout = float(getattr(self.args, "visual_set_items_embed_timeout", 0) or 0) or 30.0 + logger.info(f"set_items_embed START dp={self.dp_rank_id} batch={len(uuids)} timeout={timeout}s") + cache_ok = False + cache_err: str = "" + try: + # set_items_embed 是同步 RPyC 调用, embed cache 卡死时它本身不会返回。 + # 用线程 + join(timeout) 包一层, 同时通过 _cache_call_lock 串行化所有 cache 调用, + # 卡住时让本批次直接以 cache_failed 告知 manager, 并防止 daemon 线程累积。 + self._call_cache_with_timeout( + self.cache_client.root.set_items_embed, + args=(uuids,), + timeout=timeout, + desc="set_items_embed", + ) + cache_ok = True + logger.info(f"set_items_embed DONE dp={self.dp_rank_id} batch={len(uuids)}") + except Exception as e: + cache_err = str(e) + logger.exception( + f"set_items_embed failed dp={self.dp_rank_id} batch={len(uuids)} " f"uuids={uuids}: {e}" + ) for image in images: self._log_latency(image, stage="set_items_embed") for image in images: - image.event.set() + if cache_ok: + self._mark_success(image) + else: + self._mark_failure(image, f"set_items_embed_failed: {cache_err}") self._log_latency(image, stage="set_event") + + def _call_cache_with_timeout(self, fn, args, timeout, desc: str): + """Serialize cache_client RPC calls + bound wall-clock budget. + + A persistent embed-cache stall used to leak unbounded daemon threads on a + shared ``self.cache_client`` connection. The lock guarantees only one + thread is actually issuing an RPyC call at a time. New callers either + succeed within ``timeout`` or time out cleanly via ``_call_with_timeout``; + they do not pile up against the connection. + """ + # Test affordance: inject a stall to verify the timeout path. Set + # LIGHTLLM_VISUAL_INJECT_CACHE_STALL= to make every cache RPC + # sleep that many seconds instead of running. Used to exercise issue.md + # acceptance criterion 8 end-to-end without standing up a broken cache server. + stall_s = os.getenv("LIGHTLLM_VISUAL_INJECT_CACHE_STALL", "") + if stall_s: + try: + stall = float(stall_s) + except ValueError: + stall = 0.0 + if stall > 0: + fn = lambda *a, **k: time.sleep(stall) or None # noqa: E731 + + def _locked_call(*locked_args): + # Sub-timeout acquire so a stuck predecessor doesn't park us indefinitely; + # the outer _call_with_timeout still bounds the whole call. + if not self._cache_call_lock.acquire(timeout=timeout): + raise TimeoutError( + f"{desc} could not acquire _cache_call_lock within {timeout}s (cache appears stalled)" + ) + try: + return fn(*locked_args) + finally: + self._cache_call_lock.release() + + return self._call_with_timeout(_locked_call, args=args, timeout=timeout, desc=desc) + + @staticmethod + def _call_with_timeout(fn, args, timeout, desc: str): + """同步调用 ``fn(*args)``, 超过 ``timeout`` 秒抛 TimeoutError。 + + 线程化等待避免 RPyC 同步调用永远阻塞调用方。注意: 后台线程在超时后仍会 + 持续运行, 因为我们无法安全地取消一个 C-extension 内的远端调用, 但它不会 + 阻塞 worker 推进 (worker 已经走 mark_failure 通知 manager)。 + """ + result: List = [None] + exc: List[Optional[BaseException]] = [None] + done = threading.Event() + + def runner(): + try: + result[0] = fn(*args) + except BaseException as e: + exc[0] = e + finally: + done.set() + + t = threading.Thread(target=runner, daemon=True, name=f"call_with_timeout_{desc}") + t.start() + if not done.wait(timeout): + raise TimeoutError(f"{desc} timed out after {timeout}s") + if exc[0] is not None: + raise exc[0] + return result[0] + + def _mark_success(self, image: ImageItem): + """成功完成单张图片: 通过 RPyC netref 通知 manager 端的 VisualInferResult。""" + result_ref = getattr(image, "result", None) + if result_ref is None: + return + try: + result_ref.mark_success() + except Exception: + logger.exception( + f"mark_success failed uuid={getattr(image, 'uuid', None)} md5={getattr(image, 'md5', None)}" + ) + + def _mark_failure(self, image: ImageItem, reason: str): + """单张图片失败: 通过 RPyC netref 让 manager 走 abort 路径。""" + result_ref = getattr(image, "result", None) + if result_ref is None: + return + try: + result_ref.mark_failure(reason) + except Exception: + logger.exception( + f"mark_failure failed uuid={getattr(image, 'uuid', None)} " + f"md5={getattr(image, 'md5', None)} reason={reason}" + ) + + def _fail_batch(self, images: List[ImageItem], stage: str, exc: Exception): + """ + Per-batch 失败兜底: 记录上下文, 释放信号量, 把所有 image 标记为失败, 然后继续 worker 循环。 + + - 仅 rank 0 持有外部 result 引用, 因此只在 rank 0 调 mark_failure。 + - 信号量在所有 rank 上各自维护, 因此所有 rank 都需要释放。 + - 任何一步出错都被吞掉, 因为 worker 线程不能因为兜底逻辑再次退出。 + - 失败状态会让 manager.handle_images 的 wait 后检查抛 RuntimeError, 触发 abort 路径。 + """ + uuids_log = [getattr(img, "uuid", None) for img in images] + md5s_log = [getattr(img, "md5", None) for img in images] + logger.exception( + f"{stage} batch failed, recovering and continuing: " + f"batch_size={len(images)} dp={self.dp_rank_id} tp={self.tp_rank_id} " + f"uuids={uuids_log} md5s={md5s_log}: {exc}" + ) + reason = f"{stage}: {exc}" + for image in images: + if self.tp_rank_id == 0: + self._mark_failure(image, reason) + try: + self.sempare.release() + except Exception: + logger.exception("semaphore release during fail_batch raised; continuing") diff --git a/lightllm/server/visualserver/model_infer/model_rpc_client.py b/lightllm/server/visualserver/model_infer/model_rpc_client.py index 682d6affcc..b4cc527707 100644 --- a/lightllm/server/visualserver/model_infer/model_rpc_client.py +++ b/lightllm/server/visualserver/model_infer/model_rpc_client.py @@ -1,36 +1,52 @@ import asyncio import rpyc -import threading +from rpyc.core.async_ import AsyncResultTimeout from typing import Dict, List, Tuple, Deque, Optional, Union from lightllm.server.multimodal_params import ImageItem from .model_rpc import VisualModelRpcServer +# init_model loads weights and runs autotune/cuda-graph capture — slow on cold start +# but bounded; pick a generous timeout so we still detect a truly hung worker. +_INIT_MODEL_TIMEOUT_S = 600 +# run_task only enqueues images; the actual inference completion is signaled +# separately via VisualInferResult. If the worker is alive, this call returns in ms; +# anything longer means the RPC server is dead/hung and we should fail fast so the +# manager goes down the abort path instead of blocking on ans.wait forever. +_RUN_TASK_TIMEOUT_S = 30 + + class VisualModelRpcClient: def __init__(self, rpc_conn): self.rpc_conn: VisualModelRpcServer = rpc_conn - def async_wrap(f): - f = rpyc.async_(f) + def make_bounded(f, timeout_s: float, op_name: str): + async_f = rpyc.async_(f) async def _func(*args, **kwargs): - ans = f(*args, **kwargs) - await asyncio.to_thread(ans.wait) - # raise if exception + ans: rpyc.AsyncResult = async_f(*args, **kwargs) + # RPyC's AsyncResult.wait() takes no timeout argument (rpyc 5.x); + # set_expiry() configures the deadline, and wait() raises + # AsyncResultTimeout on expiry. Replaces the previous unbounded wait + # that swallowed hung/dead model RPCs and bypassed --visual_infer_timeout. + ans.set_expiry(timeout_s) + try: + await asyncio.to_thread(ans.wait) + except AsyncResultTimeout as e: + raise TimeoutError( + f"{op_name} did not return within {timeout_s}s; visual worker RPC may be hung" + ) from e return ans.value return _func - self._init_model = async_wrap(self.rpc_conn.root.init_model) - self._run_task = async_wrap(self.rpc_conn.root.run_task) + self._init_model = make_bounded(self.rpc_conn.root.init_model, _INIT_MODEL_TIMEOUT_S, "init_model") + self._run_task = make_bounded(self.rpc_conn.root.run_task, _RUN_TASK_TIMEOUT_S, "run_task") return async def init_model(self, kvargs): - ans: rpyc.AsyncResult = self._init_model(kvargs) - await ans - return + return await self._init_model(kvargs) - async def run_task(self, images: List[ImageItem], ref_event_list: List[threading.Event]): - ans = self._run_task(images, ref_event_list) - return await ans + async def run_task(self, images: List[ImageItem], ref_result_list): + return await self._run_task(images, ref_result_list) diff --git a/lightllm/server/visualserver/objs.py b/lightllm/server/visualserver/objs.py index 656f3d3eae..c889962934 100644 --- a/lightllm/server/visualserver/objs.py +++ b/lightllm/server/visualserver/objs.py @@ -8,6 +8,14 @@ "allow_all_attrs": True, "allow_getattr": True, "allow_setattr": True, + # Bound *all* synchronous RPyC calls served on this config so a hung peer cannot + # leave callers blocked forever (2026-05-09 incident). Applies in both directions: + # - client → server: enqueue / one-shot RPCs (proxy_manager → remote visual server) + # - server → client: worker netref calls back into manager (VisualInferResult.mark_*) + # 30s is comfortably above the per-batch latency we see in practice; init_model and + # run_task callers further pass explicit ans.wait timeouts when they need different + # budgets (init is slow, run_task should be fast). + "sync_request_timeout": 30, } diff --git a/lightllm/server/visualserver/proxy_manager.py b/lightllm/server/visualserver/proxy_manager.py index 0c977b2aa9..4596dd4f3e 100644 --- a/lightllm/server/visualserver/proxy_manager.py +++ b/lightllm/server/visualserver/proxy_manager.py @@ -51,14 +51,23 @@ def __init__( ) async def handle_group_indexes(self, group_req_indexes: GroupReqIndexes): - images_need_infer = self.get_need_infer_images(group_req_indexes) - - # case 1 - if len(images_need_infer) == 0: - self.send_to_next_module.send_pyobj(group_req_indexes, protocol=pickle.HIGHEST_PROTOCOL) - return - + # Mirror VisualManager.handle_group_indexes hardening (2026-05-09 incident): + # * get_need_infer_images inside try with bounded executor wait + # * set_items_embed wrapped via dedicated cache executor (sync_request_timeout + # on cache_client also fires at the socket layer) + # * any failure marks shm_req aborted before forwarding + cache_timeout = max(int(getattr(self.args, "visual_get_items_embed_timeout", 0) or 0), 0) or 10 + loop = asyncio.get_running_loop() try: + images_need_infer = await asyncio.wait_for( + loop.run_in_executor(self._cache_executor, self.get_need_infer_images, group_req_indexes), + timeout=cache_timeout, + ) + + # case 1 + if len(images_need_infer) == 0: + self.send_to_next_module.send_pyobj(group_req_indexes, protocol=pickle.HIGHEST_PROTOCOL) + return def _get_not_afs_ready_images(): readys = self.afs_handler.check_ready([image.md5 for image in images_need_infer]) @@ -92,9 +101,14 @@ def _load_to_cpu_cache(): end = start + tensor.shape[0] assert end - start == image.token_num self.cpu_embed_cache_client.cpu_embed_cache_tensor[start:end].copy_(tensor) + # set_items_embed 也走 cache executor + 受 cache_client 的 sync_request_timeout 兜底 self.cache_client.root.set_items_embed([image.uuid for image in images_need_infer]) - await asyncio.to_thread(_load_to_cpu_cache) + set_timeout = max(int(getattr(self.args, "visual_set_items_embed_timeout", 0) or 0), 0) or 30 + await asyncio.wait_for( + loop.run_in_executor(self._cache_executor, _load_to_cpu_cache), + timeout=set_timeout, + ) except Exception as e: # mark aborted @@ -124,11 +138,18 @@ def run_task(self, conn: rpyc.Connection, images: List[ImageItem]): # 将 bytes 从 shm 中读取出来,放到 image.data_bytes 中,供远端的 vit 进行推理使用。 for image in images: image.data_bytes = read_shm(get_shm_name_data(image.uuid)) + start = time.time() if self.args.detail_log: - start = time.time() logger.info(f"Start to remote infer images {[image.md5 for image in images]}") conn.root.remote_infer_images(images, event) - event.wait(timeout=600) + # 远端 vit 卡死时, 必须让 caller 走 abort 路径而不是默默等待 600s 后假装完成。 + wait_timeout = max(int(getattr(self.args, "visual_infer_timeout", 0) or 0), 0) or 120 + ok = event.wait(timeout=wait_timeout) + if not ok: + raise TimeoutError( + f"remote_infer_images did not signal within {wait_timeout}s " + f"for md5s={[image.md5 for image in images]}; treating as failure" + ) if self.args.detail_log: logger.info( f"Remote infer images done for images {[image.md5 for image in images]}" diff --git a/lightllm/server/visualserver/visual_only_manager.py b/lightllm/server/visualserver/visual_only_manager.py index b06713d87c..ce3fc16b58 100644 --- a/lightllm/server/visualserver/visual_only_manager.py +++ b/lightllm/server/visualserver/visual_only_manager.py @@ -126,8 +126,8 @@ async def wait_to_model_ready(self): async def handle_images(self, images_need_infer: List[ImageItem]): await VisualManager.handle_images(self, images_need_infer=images_need_infer) - async def infer_images(self, dp_index: int, images, events): - await VisualManager.infer_images(self, dp_index=dp_index, images=images, events=events) + async def infer_images(self, dp_index: int, images, results): + await VisualManager.infer_images(self, dp_index=dp_index, images=images, results=results) def clean_up(self): return diff --git a/lightllm/utils/health_check.py b/lightllm/utils/health_check.py index d2a776b862..6aa0cfc609 100644 --- a/lightllm/utils/health_check.py +++ b/lightllm/utils/health_check.py @@ -1,11 +1,11 @@ import os import time from dataclasses import dataclass -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, List from lightllm.server.router.dynamic_prompt.shared_arr import SharedInt from lightllm.utils.log_utils import init_logger -from lightllm.utils.envs_utils import get_unique_server_name +from lightllm.utils.envs_utils import get_unique_server_name, get_env_start_args if TYPE_CHECKING: from lightllm.server.core.objs.shm_req_manager import ShmReqManager @@ -13,6 +13,30 @@ logger = init_logger(__name__) +def scan_visual_unhealthy_sentinels(args) -> List[str]: + """Return descriptions of visual worker watchdog sentinel files.""" + + visual_dp = int(getattr(args, "visual_dp", 0) or 0) + visual_tp = int(getattr(args, "visual_tp", 0) or 0) + if visual_dp <= 0 or visual_tp <= 0: + return [] + + server_name = get_unique_server_name() + dead = [] + for dp in range(visual_dp): + for tp in range(visual_tp): + path = f"/tmp/lightllm_visual_unhealthy_{server_name}_dp{dp}_tp{tp}" + if not os.path.exists(path): + continue + try: + with open(path) as fh: + info = fh.read().strip() + except Exception: + info = "" + dead.append(f"dp={dp} tp={tp}: {info}") + return dead + + @dataclass class HealthObj: grace_timeout: int = int(os.getenv("HEALTH_TIMEOUT", "200")) @@ -25,6 +49,12 @@ def __post_init__(self): def check(self, shm_req_manager: "ShmReqManager") -> bool: """On-the-fly health check: recent success is ok; otherwise require no in-flight shm requests.""" try: + args = get_env_start_args() + visual_dead = scan_visual_unhealthy_sentinels(args) + if visual_dead: + logger.error(f"Health check failed: visual worker watchdog reports dead: {visual_dead}") + return False + now = time.time() last_success_time = self.latest_success_infer_time_mark.get_value()