Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions agents/completed.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,33 @@

---

# #377: HF binding files= not honored on every download path (14GB instead of ~3GB)

**Completed:** yes
**Status:** DONE (2026-07-04, Claude) — root-cause fix: `Executor.__init__` registers every endpoint binding by wire ref on the `ModelStore` (`register_binding`), and `ModelStore.ensure_local` falls back to that index when called with a bare ref — so `files=`/provider now apply on the ModelOp DOWNLOAD/LOAD and lifecycle startup-prefetch paths, not just setup. Also found + fixed the fp32-pull selector bug: `select_hf_files` treated root monolithic checkpoints (sd1.5's untagged `v1-5-pruned*.safetensors`, 12GB) as their own weight group and "untagged" won for that group even though component dirs had fp16 — diffusers-layout repos now exclude root single-file checkpoints entirely. Boundary tests (no network) assert allow_patterns/provider at the download layer: tests/test_executor_prefetch_binding.py, tests/test_hf_selection.py.

## Metadata
- Category: models
- Status: done

---

# #376: reserved-source materialization — ctx.source_path is never populated

**Completed:** yes
**Status:** DONE (2026-07-04, Claude) — `Executor._run_job` now extracts the reserved `source`/`destination` structs from producer-kind (non-inference) payloads, populates `ctx.source`/`ctx.destination`, and materializes `payload.source` through the same `ModelStore.ensure_local` path as model bindings (identical retry + RETRYABLE/INVALID classification + ModelEvent emission) before the handler runs — `ctx._set_source_path` finally has a caller. Missing `source` is a no-op (dataset producers like generate_prompt_corpus carry none); present-but-empty ref → INVALID. Deferred: live cast-dtype run against the hub (needs hub infra + GPU; training-endpoints #37 deferral stands). Tests: tests/test_executor_source_materialization.py.

## Tasks
- [x] Executor pre-invoke hook: detect reserved `source` in payload for producer kinds, ensure_local, set ctx.source_path
- [x] Failure classification + ModelEvent parity with model-binding downloads
- [x] Tests: producer endpoint with source ref gets a real local dir; missing/invalid source → INVALID not crash
- [ ] Verify training-endpoints cast-dtype runs live against the hub (their #37 deferral) — deferred, needs live hub + GPU

## Acceptance
A transform endpoint (e.g. cast-dtype) invoked with `source: {repo: owner/name:tag}` receives a materialized local snapshot at ctx.source_path.

---

# #369: residency registry must own the executor's pipelines — honest vram_bytes, no load races

**Completed:** yes
Expand Down
32 changes: 0 additions & 32 deletions agents/progress.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,6 @@ next_id: 378

---

# #377: HF binding files= not honored on every download path (14GB instead of ~3GB)

**Completed:** no
**Status:** OPEN (2026-07-04, filed from e2e #105 J4) — the sd15-image example binds `HF("stable-diffusion-v1-5/stable-diffusion-v1-5", dtype="fp16", files=("*.json","*.txt","*.fp16.safetensors"))`, yet the worker's cold download fetched the FULL fp32 root checkpoints too (v1-5-pruned.safetensors 7.7GB + v1-5-pruned-emaonly.safetensors 4.3GB) — ~14GB on disk where ~3GB suffices. `Store.ensure_local` threads `allow_patterns=binding.files` on the setup path (executor.py:540→359), but at least one download path reaches `models/download.py` without the binding (ModelOp/prefetch path calls `store.ensure_local(ref, snap)` with no binding, executor.py:753) and falls back to `select_hf_files`' variant selector or the whole repo. Fix: the worker knows the binding for every ref in its own endpoint spec — resolve binding-by-ref once (registry) and apply its files/provider on ALL download paths. Also worth checking why the variant selector (if it ran) still took fp32 root weights when an fp16 group exists.

## Metadata
- Category: models
- Status: planned

---



# #376: reserved-source materialization — ctx.source_path is never populated

**Completed:** no
**Status:** OPEN — filed 2026-07-04 from training-endpoints #37 rewrite. `RequestContext.source_path` + `_set_source_path()` exist (request_context/__init__.py:770,777) but no caller: grep shows zero call sites in src/ or packages/. training-endpoints transform/quant producers (cast-dtype, convert-gguf, convert-flashpack, repackage-singlefile, torchao/bnb/modelopt quant) read `ctx.source_path` expecting the library to have materialized the payload's reserved `source` repo locally before invoke. Today they get None and fail.

Executor should, for producer-kind endpoints whose payload carries the reserved `source` struct: resolve the source ref → `ensure_local` (models/download.py, same snapshot path as model bindings) → `ctx._set_source_path(local_dir)` before the handler runs; download failures map to the same RETRYABLE/INVALID classification as model bindings. No new wire messages — the source ref resolves like any tensorhub ref (hub snapshot on RunJob or resolve-at-worker per CONTRACT §5).

## Tasks
- [ ] Executor pre-invoke hook: detect reserved `source` in payload for producer kinds, ensure_local, set ctx.source_path
- [ ] Failure classification + ModelEvent parity with model-binding downloads
- [ ] Tests: producer endpoint with source ref gets a real local dir; missing/invalid source → INVALID not crash
- [ ] Verify training-endpoints cast-dtype runs live against the hub (their #37 deferral)

## Acceptance
A transform endpoint (e.g. cast-dtype) invoked with `source: {repo: owner/name:tag}` receives a materialized local snapshot at ctx.source_path.

---


# #362: docs teach a deleted API — every README quickstart raises ImportError

**Completed:** no
Expand Down
50 changes: 49 additions & 1 deletion src/gen_worker/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,21 @@ def _sanitize(message: str) -> str:
return out[:1024]


def _reserved_repo_info(payload: Any, field_name: str) -> Dict[str, Any]:
"""``payload.source`` / ``payload.destination`` as a plain dict ({} when
absent). Producer payloads carry these reserved-name structs (#376)."""
obj = getattr(payload, field_name, None)
if obj is None:
return {}
if isinstance(obj, dict):
return dict(obj)
try:
out = msgspec.to_builtins(obj)
except Exception:
return {}
return out if isinstance(out, dict) else {}


def _map_exception(exc: BaseException) -> Tuple[int, str]:
"""-> (JobStatus, safe_message)."""
if isinstance(exc, (CanceledError, asyncio.CancelledError)):
Expand Down Expand Up @@ -177,6 +192,7 @@ def __init__(
on_event=self._on_residency_event, vram_budget_bytes=vram_budget_bytes,
)
self._locks: Dict[str, asyncio.Lock] = {}
self._bindings: Dict[str, Any] = {}
self.keep: set[str] = set()
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._index = disk_gc.RefIndex(self._cache_dir)
Expand Down Expand Up @@ -305,6 +321,12 @@ async def _ensure_disk_headroom(self, ref: str, needed_bytes: int) -> None:
def _lock(self, ref: str) -> asyncio.Lock:
return self._locks.setdefault(ref, asyncio.Lock())

def register_binding(self, ref: str, binding: Any) -> None:
"""Endpoint-spec binding for ``ref`` — supplies files/provider on
download paths that only carry the bare ref (ModelOp, startup
prefetch), so ``files=`` selections apply everywhere (#377)."""
self._bindings.setdefault(ref, binding)

async def ensure_local(
self,
ref: str,
Expand All @@ -314,8 +336,11 @@ async def ensure_local(
) -> Path:
"""Materialize `ref` on disk. Transient failures retry with backoff;
terminal (4xx-class) failures raise immediately. Emits ModelEvents.
``binding`` (when known) supplies provider + file-selection metadata."""
``binding`` (when known) supplies provider + file-selection metadata;
bare-ref callers fall back to the registered endpoint binding."""
self.bind_loop()
if binding is None:
binding = self._bindings.get(ref)
async with self._lock(ref):
cached = self.residency.local_path(ref)
if cached is not None and cached.exists():
Expand Down Expand Up @@ -434,6 +459,9 @@ def __init__(
self._send = send
self._settings = settings
self.store = store or ModelStore(send)
for s in specs:
for b in s.models.values():
self.store.register_binding(wire_ref(b), b)
self._gpu_semaphore = asyncio.Semaphore(max(1, gpu_slots))
# Model loads/promotions serialize so allocator-delta measurements
# and free-VRAM reads don't cross-contaminate (#369).
Expand Down Expand Up @@ -934,6 +962,10 @@ async def _run_job(self, job: _Job, run: pb.RunJob) -> None:
gpu_index = int(compute.gpu_index) if compute is not None else 0
timeout_ms = int(run.timeout_ms or 0) or int(spec.timeout_ms or 0)

producer = spec.kind != "inference"
source_info = _reserved_repo_info(payload, "source") if producer else {}
destination_info = _reserved_repo_info(payload, "destination") if producer else {}

ctx_cls = _CONTEXT_BY_KIND.get(spec.kind, RequestContext)
ctx = ctx_cls(
request_id=run.request_id,
Expand All @@ -949,6 +981,8 @@ async def _run_job(self, job: _Job, run: pb.RunJob) -> None:
gpu_count=int(compute.gpu_count) if compute is not None else 0,
),
models={b.slot: b.ref for b in run.models},
source_info=source_info,
destination_info=destination_info,
execution_hints=(
{"output_format": "inline"} if run.output_mode == pb.OUTPUT_MODE_INLINE else {}
),
Expand All @@ -957,6 +991,8 @@ async def _run_job(self, job: _Job, run: pb.RunJob) -> None:
job.ctx = ctx

try:
if source_info:
await self._materialize_source(ctx, source_info, snapshots)
instance = await self.ensure_setup(spec, snapshots)
kwargs = await self._handler_kwargs(spec, snapshots)
except asyncio.CancelledError:
Expand Down Expand Up @@ -1017,6 +1053,18 @@ async def _run_job(self, job: _Job, run: pb.RunJob) -> None:
self._gpu_semaphore.release()
self._maybe_idle()

async def _materialize_source(
self, ctx: Any, info: Dict[str, Any], snapshots: Dict[str, pb.Snapshot]
) -> None:
"""Reserved-source contract (#376): materialize ``payload.source``
locally before the handler runs. Same ModelStore path as model
bindings — identical retry/classification and ModelEvent emission."""
ref = str(info.get("ref") or "").strip()
if not ref:
raise ValidationError("payload.source.ref must be a non-empty repo ref")
path = await self.store.ensure_local(ref, snapshots.get(ref))
ctx._set_source_path(str(path))

async def _handler_kwargs(
self, spec: EndpointSpec, snapshots: Dict[str, pb.Snapshot]
) -> Dict[str, Any]:
Expand Down
11 changes: 9 additions & 2 deletions src/gen_worker/models/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,9 @@ def select_hf_files(
"""Small variant selector: which repo files to download.

- Diffusers-style repos (component dirs): every non-weight file (configs,
tokenizers) plus ONE weight set per directory — the requested ``flavor``
when present, else bf16 > fp16 > untagged, safetensors preferred.
tokenizers) plus ONE weight set per component directory — the requested
``flavor`` when present, else bf16 > fp16 > untagged, safetensors
preferred. Root monolithic checkpoints are excluded (redundant).
- Root-weights repos: the root weight files (same variant rule) + sidecars.
- Anything else: None (download the whole repo).
"""
Expand All @@ -295,6 +296,12 @@ def select_hf_files(
if not diffusers_like and set(weights_by_dir) != {""}:
return None # unrecognized layout: full repo

if diffusers_like and "" in weights_by_dir and len(weights_by_dir) > 1:
# Root single-file distributions (sd1.5's v1-5-pruned*.safetensors)
# duplicate the component-dir weights — diffusers loads from the
# component dirs, so never pull the monolithic root checkpoints.
del weights_by_dir[""]

def _pick(group: list[str]) -> list[str]:
st = [f for f in group if f.lower().endswith(".safetensors")]
pool = st or group
Expand Down
117 changes: 117 additions & 0 deletions tests/test_executor_prefetch_binding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""#377: binding files=/provider must apply on EVERY download path.

The ModelOp/startup-prefetch paths only carry the bare ref; the executor
registers each endpoint binding by wire ref so ModelStore.ensure_local can
recover files/provider at the download-layer boundary (no network)."""

from __future__ import annotations

import asyncio
from pathlib import Path
from typing import Any, Dict, List

import msgspec

import gen_worker.executor as executor_mod
from gen_worker.api.binding import HF
from gen_worker.executor import Executor
from gen_worker.pb import worker_scheduler_pb2 as pb
from gen_worker.registry import EndpointSpec


class _In(msgspec.Struct):
x: str


class _Out(msgspec.Struct):
y: str


_BINDING = HF(
"stable-diffusion-v1-5/stable-diffusion-v1-5",
dtype="fp16",
files=("*.json", "*.txt", "*.fp16.safetensors"),
)
_REF = "stable-diffusion-v1-5/stable-diffusion-v1-5"


def _spec() -> EndpointSpec:
class Endpoint:
def setup(self, model: str) -> None: # pragma: no cover
pass

def run(self, ctx, payload: _In) -> _Out: # pragma: no cover
return _Out(y=payload.x)

return EndpointSpec(
name="sd15", method=Endpoint.run, kind="inference",
payload_type=_In, output_mode="single", cls=Endpoint,
attr_name="run", models={"model": _BINDING},
)


async def _noop_send(msg: pb.WorkerMessage) -> None:
pass


def _capture_download(monkeypatch, tmp_path: Path) -> List[Dict[str, Any]]:
calls: List[Dict[str, Any]] = []

async def _fake_ensure_local(ref: str, **kwargs: Any) -> Path:
calls.append({"ref": ref, **kwargs})
return tmp_path

monkeypatch.setattr(executor_mod, "ensure_local", _fake_ensure_local)
return calls


def test_model_op_download_applies_binding_files(monkeypatch, tmp_path) -> None:
calls = _capture_download(monkeypatch, tmp_path)

async def _run() -> None:
ex = Executor([_spec()], _noop_send)
await ex.handle_model_op(pb.ModelOp(op=pb.MODEL_OP_KIND_DOWNLOAD, ref=_REF))

asyncio.run(_run())
assert len(calls) == 1
assert calls[0]["ref"] == _REF
assert calls[0]["provider"] == "hf"
assert calls[0]["allow_patterns"] == _BINDING.files


def test_bare_ref_store_ensure_local_applies_binding_files(monkeypatch, tmp_path) -> None:
# The lifecycle startup-prefetch shape: store.ensure_local(ref) with
# nothing else.
calls = _capture_download(monkeypatch, tmp_path)

async def _run() -> None:
ex = Executor([_spec()], _noop_send)
await ex.store.ensure_local(_REF)

asyncio.run(_run())
assert calls[0]["provider"] == "hf"
assert calls[0]["allow_patterns"] == _BINDING.files


def test_explicit_binding_still_wins(monkeypatch, tmp_path) -> None:
calls = _capture_download(monkeypatch, tmp_path)
override = HF(_REF, files=("only-this.safetensors",))

async def _run() -> None:
ex = Executor([_spec()], _noop_send)
await ex.store.ensure_local(_REF, binding=override)

asyncio.run(_run())
assert calls[0]["allow_patterns"] == ("only-this.safetensors",)


def test_unknown_ref_downloads_without_patterns(monkeypatch, tmp_path) -> None:
calls = _capture_download(monkeypatch, tmp_path)

async def _run() -> None:
ex = Executor([_spec()], _noop_send)
await ex.store.ensure_local("acme/unbound")

asyncio.run(_run())
assert calls[0]["provider"] is None
assert calls[0]["allow_patterns"] == ()
Loading
Loading