Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions lightllm/models/qwen3_vl/qwen3_visual.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,26 +376,42 @@ def encode(self, images: List[ImageItem]):
valid_id = 0
img_grids = []
uuids = []
# 任意一张图片解码 / preprocess 失败都视为请求级失败:
# 跳过这张图但保留 batch 中其他图片, 否则一张截断图就会让整个 ViT worker 崩溃。
failed_images = []
for i, img in enumerate(images):
if isinstance(img, ImageItem):
uuids.append(img.uuid)
if not isinstance(img, ImageItem):
raise Exception("Unsupported input types: {} for {}".format(type(img), img))
try:
image_data = read_shm(get_shm_name_data(img.uuid))
image_data = Image.open(BytesIO(image_data))
pixel_values, image_grid_thw = self.processor.preprocess(image_data)
except Exception as e:
logger.exception(
f"image preprocess failed, treating as per-request error: "
f"uuid={img.uuid} md5={getattr(img, 'md5', None)}: {e}"
)
failed_images.append(img)
continue

img_tensors.append(pixel_values)
img_grids.append(image_grid_thw)
else:
raise Exception("Unsupported input types: {} for {}".format(type(img), img))
uuids.append(img.uuid)
img_tensors.append(pixel_values)
img_grids.append(image_grid_thw)

# must devide merge_length
cur_num = img_tensors[-1].shape[0] // (self.spatial_merge_size ** 2)

valid_ids.append([valid_id, valid_id + cur_num])
valid_id += cur_num

# 失败的图片直接打通知 event, 释放 manager 端的等待和 shm req 资源。
# 这里通过 attribute 标记, 由调用方 (model_rpc) 负责设置 event 并释放信号量。
for img in failed_images:
img.preprocess_failed = True

if len(img_tensors) <= 0:
return None
# 整 batch 都失败: 返回空结果由 _infer_worker 统一处理 preprocess_failed 的 image。
return None, [], []

imgs = torch.cat(img_tensors, dim=0)
grid_thw = torch.cat(img_grids, dim=0)
Expand Down
34 changes: 34 additions & 0 deletions lightllm/server/api_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,40 @@ def make_argument_parser() -> argparse.ArgumentParser:
parser.add_argument(
"--visual_infer_batch_size", type=int, default=None, help="number of images to process in each inference batch"
)
parser.add_argument(
"--visual_infer_timeout",
type=int,
default=120,
help="""
Bound on how long VisualManager.handle_images() waits on the per-batch event
(in seconds). Prevents a stalled or crashed ViT worker from blocking visual
requests forever and exhausting the default asyncio executor (see the
2026-05-09 incident). On timeout the request is marked aborted and forwarded
to the next module for normal abort handling. Set to 0 to use the default.
""",
)
parser.add_argument(
"--visual_set_items_embed_timeout",
type=int,
default=30,
help="""
Bound (seconds) on the synchronous RPyC call to embed_cache_client.set_items_embed
inside the visualserver store worker. A hung cache server used to silently block
the store worker forever (2026-05-09 incident); on timeout the batch is reported
back as failed so manager triggers per-request abort. Set to 0 to use the default.
""",
)
parser.add_argument(
"--visual_get_items_embed_timeout",
type=int,
default=10,
help="""
Bound (seconds) on the synchronous get_items_embed cache RPC issued from
VisualManager.handle_group_indexes before deciding which images need ViT. Wraps
the call in asyncio.to_thread + wait_for so a hung cache cannot block the visual
event loop or skip the abort path (2026-05-09 incident).
""",
)
parser.add_argument(
"--visual_send_batch_size",
type=int,
Expand Down
3 changes: 3 additions & 0 deletions lightllm/server/core/objs/start_args_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ class StartArgs:
push_interval: int = field(default=10)
visual_node_id: int = field(default=None)
visual_infer_batch_size: int = field(default=None)
visual_infer_timeout: int = field(default=120)
visual_set_items_embed_timeout: int = field(default=30)
visual_get_items_embed_timeout: int = field(default=10)
visual_send_batch_size: int = field(default=1)
visual_gpu_ids: List[int] = field(default_factory=lambda: [0])
visual_tp: int = field(default=1)
Expand Down
133 changes: 116 additions & 17 deletions lightllm/server/visualserver/manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import zmq
import zmq.asyncio
import asyncio
import concurrent.futures
import uvloop
import rpyc
import socket
Expand All @@ -27,6 +28,40 @@
logger = init_logger(__name__)


class VisualInferResult:
"""Per-image visual inference outcome.

Combines completion signaling with success/failure status so the manager can
distinguish "embed is in cache, OK to forward" from "ViT failed, abort the
request". Workers call ``mark_success`` / ``mark_failure`` (the manager
receives the call via the RPyC netref) and the manager waits + inspects
``success`` after ``event.wait()`` returns.

Before this object was introduced, workers only signaled a bare
``threading.Event`` on both success and failure, so the manager forwarded
failed requests to the router with missing embeddings (2026-05-09 incident).
"""

__slots__ = ("event", "success", "error")

def __init__(self):
self.event = threading.Event()
self.success = False
self.error: str = ""

def mark_success(self):
self.success = True
self.event.set()

def mark_failure(self, error: str = ""):
# success stays False; record the reason so manager logs are useful.
self.error = error or self.error or "unknown"
self.event.set()

def wait(self, timeout):
return self.event.wait(timeout)


class VisualManager:
def __init__(
self,
Expand All @@ -48,8 +83,24 @@ def __init__(

self.zmq_recv_socket = context.socket(zmq.PULL)
self.zmq_recv_socket.bind(f"{args.zmq_mode}127.0.0.1:{args.visual_port}")
self.cache_client = rpyc.connect("localhost", args.cache_port, config={"allow_pickle": True})
# sync_request_timeout 让阻塞的 RPyC 调用 (get_items_embed/set_items_embed) 从 socket
# 层真正抛 TimeoutError, 避免泄漏 default executor 线程导致 net-io 饿死。
cache_rpyc_timeout = max(
int(getattr(args, "visual_get_items_embed_timeout", 0) or 0),
int(getattr(args, "visual_set_items_embed_timeout", 0) or 0),
10,
)
self.cache_client = rpyc.connect(
"localhost",
args.cache_port,
config={"allow_pickle": True, "sync_request_timeout": cache_rpyc_timeout},
)
self.cache_client._channel.stream.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# 专用 executor: 同步 cache RPC 都走这个池, 与 asyncio default executor 隔离,
# 即便某次 cache 调用 hang (sync_request_timeout 兜底前) 也不会饿死 zmq recv_pyobj。
self._cache_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=2, thread_name_prefix="visual_cache_rpc"
)
Comment on lines +101 to +103

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

RPyC connections are not thread-safe. Since self.cache_client is called from threads in self._cache_executor (which has max_workers=2), concurrent requests can call get_items_embed concurrently, leading to protocol corruption or connection hangs. We should initialize a lock to serialize all RPyC calls on self.cache_client.

Suggested change
self._cache_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=2, thread_name_prefix="visual_cache_rpc"
)
self._cache_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=2, thread_name_prefix='visual_cache_rpc'
)
self._cache_lock = threading.Lock()

self.model_weightdir = args.model_dir
self.vit_dp = args.visual_dp
self.vit_tp = args.visual_tp
Expand All @@ -60,12 +111,10 @@ def __init__(
self.lock = asyncio.Lock()

async def wait_to_model_ready(self):

self.model_rpcs: List[List[VisualModelRpcClient]] = [[] for _ in range(self.vit_dp)]
self.vit_attn_backend = init_vit_att_backend(index=0)
for dp_rank_id in range(self.vit_dp):
for tp_rank_id in range(self.vit_tp):

rpc_model = await start_model_process()
self.model_rpcs[dp_rank_id].append(rpc_model)

Expand All @@ -92,6 +141,12 @@ async def wait_to_model_ready(self):
return

def get_need_infer_images(self, group_req_indexes: GroupReqIndexes) -> List[ImageItem]:
"""同步路径: 检查 shm req 是否 aborted, 必要时调用 embed cache 询问哪些 image 已就绪。

``cache_client.root.get_items_embed`` 是同步 RPyC 调用; 调用方需要包一层
``asyncio.to_thread`` 并加超时, 避免 embed cache 卡死时阻塞 asyncio 事件循环
(2026-05-09 incident)。
"""
Comment on lines 143 to +149

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

To ensure thread safety when calling self.cache_client.root.get_items_embed concurrently from multiple executor threads, please wrap the RPyC call inside get_need_infer_images with the newly introduced self._cache_lock:

with self._cache_lock:
    ready_image = obtain(self.cache_client.root.get_items_embed(img_uuids))

shm_req = self.shm_req_manager.get_req_obj_by_index(group_req_indexes.shm_req_indexes[0])
is_aborted = shm_req.is_aborted
disable_prompt_cache = shm_req.sample_params.disable_prompt_cache
Expand Down Expand Up @@ -122,15 +177,37 @@ def get_need_infer_images(self, group_req_indexes: GroupReqIndexes) -> List[Imag
return images_need_infer

async def handle_group_indexes(self, group_req_indexes: GroupReqIndexes):
images_need_infer = self.get_need_infer_images(group_req_indexes)
# 把 get_need_infer_images 也放在 try 内, 并通过 dedicated executor + wait_for 加超时,
# 因为内部包含同步 RPyC 调用 (cache_client.root.get_items_embed); embed cache 卡死时
# 不应阻塞 asyncio 事件循环或绕过 abort 路径 (2026-05-09 incident)。
# 关键: 用 _cache_executor 而不是 default executor, 否则卡死的 cache 调用会占满
# default executor 进而饿死 loop_for_netio_req 里的 zmq_recv_socket.recv_pyobj。
cache_timeout = max(int(getattr(self.args, "visual_get_items_embed_timeout", 0) or 0), 0) or 10
loop = asyncio.get_running_loop()
try:
images_need_infer = await asyncio.wait_for(
loop.run_in_executor(self._cache_executor, self.get_need_infer_images, group_req_indexes),
timeout=cache_timeout,
)

if len(images_need_infer) == 0:
self.send_to_next_module.send_pyobj(group_req_indexes, protocol=pickle.HIGHEST_PROTOCOL)
return

if len(images_need_infer) == 0:
self.send_to_next_module.send_pyobj(group_req_indexes, protocol=pickle.HIGHEST_PROTOCOL)
return
else:
await self.handle_images(images_need_infer)
self.send_to_next_module.send_pyobj(group_req_indexes, protocol=pickle.HIGHEST_PROTOCOL)
return
except Exception as e:
# visual 推理失败 (例如 worker 异常 / event 等待超时 / get_items_embed 卡住),
# 把请求标记为 aborted 再转发, 由下游 router 正常走 abort 释放路径。
logger.exception(
f"handle_group_indexes failed, marking group_req_id={group_req_indexes.group_req_id} aborted: {e}"
)
for shm_req_index in group_req_indexes.shm_req_indexes:
shm_req = self.shm_req_manager.get_req_obj_by_index(shm_req_index)
shm_req.is_aborted = True
self.shm_req_manager.put_back_req_obj(shm_req)

self.send_to_next_module.send_pyobj(group_req_indexes, protocol=pickle.HIGHEST_PROTOCOL)
return

async def handle_images(self, images_need_infer: List[ImageItem]):
if not hasattr(self, "cur_dp_index"):
Expand All @@ -140,14 +217,14 @@ async def handle_images(self, images_need_infer: List[ImageItem]):
for image in images_need_infer:
self.cur_dp_index += 1
select_dp = self.cur_dp_index % self.vit_dp
dp_to_handle_images[select_dp].append((image, threading.Event()))
dp_to_handle_images[select_dp].append((image, VisualInferResult()))

taskes = []
for dp_index in range(self.vit_dp):
_images = dp_to_handle_images[dp_index]
if _images:
taskes.append(
self.infer_images(dp_index, images=[e[0] for e in _images], events=[e[1] for e in _images])
self.infer_images(dp_index, images=[e[0] for e in _images], results=[e[1] for e in _images])
)

async with self.lock:
Expand All @@ -157,17 +234,39 @@ async def handle_images(self, images_need_infer: List[ImageItem]):
logger.exception(str(e))
raise e

# 等待推理通知已经 ok
# 等待每张图片各自的完成事件 + 检查成功状态。
# event.wait() 必须有 timeout, 否则 ViT worker 异常退出 / cache 卡死时, 这里
# 永远不会返回, 同时被 asyncio.to_thread 占用的 default executor 线程也会被耗尽
# (2026-05-09 incident)。
#
# 关键: 即使发现有失败 image, 也要先把同 batch 中其他 image 都等到 (success 或 timeout),
# 再统一抛出。否则当一张 preprocess_failed 的图片先触发 mark_failure 时, 我们立刻
# raise → 上层走 abort → 下游释放 multimodal cache id, 但 store_worker 此刻还在
# 给同 batch 中成功的 image 写 embedding, 造成写到已释放槽位的竞态。
wait_timeout = max(int(getattr(self.args, "visual_infer_timeout", 0) or 0), 0) or 120
errors: List[str] = []
for dp_index in range(self.vit_dp):
_images = dp_to_handle_images[dp_index]
if _images:
await asyncio.to_thread(_images[-1][1].wait)
for img, result in _images:
ok = await asyncio.to_thread(result.wait, wait_timeout)
if not ok:
errors.append(
f"timeout dp={dp_index} uuid={img.uuid} "
f"md5={getattr(img, 'md5', None)} timeout={wait_timeout}s"
)
continue
if not result.success:
errors.append(
f"failed dp={dp_index} uuid={img.uuid} " f"md5={getattr(img, 'md5', None)} error={result.error}"
)
if errors:
raise RuntimeError("visual infer batch had failures: " + "; ".join(errors))
return

async def infer_images(self, dp_index: int, images, events):
async def infer_images(self, dp_index: int, images, results):
taskes = []
for vit_tp_rank in range(self.vit_tp):
task = self.model_rpcs[dp_index][vit_tp_rank].run_task(images, events)
task = self.model_rpcs[dp_index][vit_tp_rank].run_task(images, results)
taskes.append(task)
await asyncio.gather(*taskes)

Expand Down
Loading
Loading