Skip to content

cozy-creator/python-gen-worker

Repository files navigation

gen-worker

Python SDK for writing endpoints that run on Cozy's worker pool. You write one decorated function or class; the SDK handles discovery, scheduling, model download + placement, cancellation, file I/O, streaming, and reporting back to the control plane.

Install

pip install gen-worker[torch]   # for PyTorch inference/training
pip install gen-worker          # plain Python (e.g. API-proxy endpoints)

Optional extras: [images] for image I/O, [audio] for audio I/O, [trainer] for trainer-class endpoints.

Hello world

pyproject.toml — the one config value:

[tool.gen_worker]
main = "myendpoint.main"

main.py:

import msgspec
from gen_worker import RequestContext, endpoint

class Input(msgspec.Struct):
    prompt: str

class Output(msgspec.Struct):
    text: str

@endpoint
def echo(ctx: RequestContext, payload: Input) -> Output:
    return Output(text=f"got: {payload.prompt}")

Run it locally, no orchestrator:

gen-worker run --payload '{"prompt": "hello"}'

cozyctl endpoint deploy (or the platform UI) takes it from here.

Adding a model

Hold state in a class: setup() runs once, every public method is one routable function. The worker downloads the binding, constructs the pipeline from the setup() annotation, and owns device placement + low-VRAM offload — endpoint code never touches .to("cuda") or offload config.

from diffusers import StableDiffusionXLPipeline
from gen_worker import HF, RequestContext, Resources, endpoint

@endpoint(
    model=HF("stabilityai/stable-diffusion-xl-base-1.0", dtype="bf16"),
    resources=Resources(vram_gb=12),
)
class Generate:
    def setup(self, pipe: StableDiffusionXLPipeline) -> None:
        self.pipe = pipe

    def generate(self, ctx: RequestContext, payload: Input) -> Output:
        image = self.pipe(payload.prompt, generator=ctx.generator(42)).images[0]
        return Output(text=ctx.save_image(image).ref)

Bindings: HF(id, revision=, dtype=, subfolder=, files=), Hub(ref, tag=, flavor=), Civitai(id, version=), ModelScope(id, ...). The slot name comes from the models={} key or the setup() parameter — never a constructor argument.

Multi-variant endpoints (bf16/fp8/... checkpoints with different VRAM envelopes) declare variants={name: (binding, Resources)} — one handler body, one routable function per variant. Streaming = an async-generator handler. Engine-hosted endpoints declare runtime="vllm" and get a booted, health-checked server subprocess injected into setup().

Full reference: docs/endpoint-authoring.md.

Public surface

  • The decorator + bindings: endpoint, Resources, HF, Hub, Civitai, ModelScope
  • Contexts: RequestContext (≤15 members), ConversionContext, DatasetContext, TrainingContext
  • Errors: ValidationError, RetryableError, CanceledError, FatalError
  • Streaming: BatchItemDelta, IncrementalTokenDelta, Done, Error
  • Value types: Asset, ImageAsset, AudioAsset, VideoAsset
  • I/O codecs: gen_worker.io

Training lives in gen_worker.trainer. The conversion ETL (hub ingest, dtype cast / quant, clone, Tensorhub publish) is the separate cozy-convert workspace package.

Local development

gen-worker run --payload '{"prompt": "hello"}'  # one-shot in-process
gen-worker run --list                            # describe functions (JSON)
gen-worker serve                                 # warm local server
gen-worker invoke <fn> prompt=hello              # client for serve
gen-worker prefetch                              # weights only, no GPU

stdout for results, stderr for events; exit 0 / 1 / 2 / 3 / 130 for success / user-exception / usage / model-resolution / SIGINT. Details: docs/local-dev.md; host contract: docs/host-integration.md.

Running tests

uv run --extra dev pytest

Plain uv run pytest would fall through to a global launcher — always pass --extra dev. Never pip install gen-worker globally: a stale ~/.local install silently shadows the working tree (tests/conftest.py hard-fails if gen_worker resolves outside src/).

Documentation

Examples

  • examples/marco-polo/ — minimal inference endpoint (sync, async, streaming)
  • examples/training-smoke/ — minimal trainer

About

A collection of worker runtimes (with Dockerfiles) + worker functions, to be used by the gen-orchestrator.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages