Python SDK for writing endpoints that run on Cozy's worker pool. You write one decorated function or class; the SDK handles discovery, scheduling, model download + placement, cancellation, file I/O, streaming, and reporting back to the control plane.
pip install gen-worker[torch] # for PyTorch inference/training
pip install gen-worker # plain Python (e.g. API-proxy endpoints)Optional extras: [images] for image I/O, [audio] for audio I/O,
[trainer] for trainer-class endpoints.
pyproject.toml — the one config value:
[tool.gen_worker]
main = "myendpoint.main"main.py:
import msgspec
from gen_worker import RequestContext, endpoint
class Input(msgspec.Struct):
prompt: str
class Output(msgspec.Struct):
text: str
@endpoint
def echo(ctx: RequestContext, payload: Input) -> Output:
return Output(text=f"got: {payload.prompt}")Run it locally, no orchestrator:
gen-worker run --payload '{"prompt": "hello"}'cozyctl endpoint deploy (or the platform UI) takes it from here.
Hold state in a class: setup() runs once, every public method is one
routable function. The worker downloads the binding, constructs the pipeline
from the setup() annotation, and owns device placement + low-VRAM offload —
endpoint code never touches .to("cuda") or offload config.
from diffusers import StableDiffusionXLPipeline
from gen_worker import HF, RequestContext, Resources, endpoint
@endpoint(
model=HF("stabilityai/stable-diffusion-xl-base-1.0", dtype="bf16"),
resources=Resources(vram_gb=12),
)
class Generate:
def setup(self, pipe: StableDiffusionXLPipeline) -> None:
self.pipe = pipe
def generate(self, ctx: RequestContext, payload: Input) -> Output:
image = self.pipe(payload.prompt, generator=ctx.generator(42)).images[0]
return Output(text=ctx.save_image(image).ref)Bindings: HF(id, revision=, dtype=, subfolder=, files=),
Hub(ref, tag=, flavor=), Civitai(id, version=), ModelScope(id, ...).
The slot name comes from the models={} key or the setup() parameter —
never a constructor argument.
Multi-variant endpoints (bf16/fp8/... checkpoints with different VRAM
envelopes) declare variants={name: (binding, Resources)} — one handler body,
one routable function per variant. Streaming = an async-generator handler.
Engine-hosted endpoints declare runtime="vllm" and get a booted,
health-checked server subprocess injected into setup().
Full reference: docs/endpoint-authoring.md.
- The decorator + bindings:
endpoint,Resources,HF,Hub,Civitai,ModelScope - Contexts:
RequestContext(≤15 members),ConversionContext,DatasetContext,TrainingContext - Errors:
ValidationError,RetryableError,CanceledError,FatalError - Streaming:
BatchItemDelta,IncrementalTokenDelta,Done,Error - Value types:
Asset,ImageAsset,AudioAsset,VideoAsset - I/O codecs:
gen_worker.io
Training lives in gen_worker.trainer. The conversion ETL (hub ingest,
dtype cast / quant, clone, Tensorhub publish) is the separate
cozy-convert workspace package.
gen-worker run --payload '{"prompt": "hello"}' # one-shot in-process
gen-worker run --list # describe functions (JSON)
gen-worker serve # warm local server
gen-worker invoke <fn> prompt=hello # client for serve
gen-worker prefetch # weights only, no GPUstdout for results, stderr for events; exit 0 / 1 / 2 / 3 / 130 for success / user-exception / usage / model-resolution / SIGINT. Details: docs/local-dev.md; host contract: docs/host-integration.md.
uv run --extra dev pytestPlain uv run pytest would fall through to a global launcher — always pass
--extra dev. Never pip install gen-worker globally: a stale
~/.local install silently shadows the working tree (tests/conftest.py
hard-fails if gen_worker resolves outside src/).
- docs/endpoint-authoring.md — the
@endpointreference: bindings, variants, Resources, contexts, streaming, runtimes. - docs/local-dev.md — the CLI:
run/serve/invoke/prefetch,field=valuegrammar,--offline, exit codes. - docs/dockerfile.md — bring-your-own-Dockerfile contract.
- docs/endpoint-envs.md — tenant envs/secrets.
examples/marco-polo/— minimal inference endpoint (sync, async, streaming)examples/training-smoke/— minimal trainer