Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
.PHONY: test lint

test:
uv run --dev pytest

lint:
uv run --dev pre-commit run --all-files
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ headers of the SSE response yourself.

A datastar response consists of 0..N datastar events. There are response
classes included to make this easy in all of the supported frameworks.
Each framework also exposes a `@datastar_response` decorator that will wrap
return values (including generators) into the right response class while
preserving sync handlers as sync so frameworks can keep them in their
threadpools.

The following examples will work across all supported frameworks when the
response class is imported from the appropriate framework package.
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ urls.GitHub = "https://github.com/starfederation/datastar-python"
dev = [
"django>=4.2.23",
"fastapi>=0.116.1",
"httpx>=0.27",
"litestar>=2.17",
"pre-commit>=4.2",
"python-fasthtml>=0.12.25; python_full_version>='3.10'",
"quart>=0.20",
"sanic>=25.3",
"starlette>=0.47.3",
"uvicorn>=0.30",
]

[tool.ruff]
Expand Down Expand Up @@ -88,5 +90,6 @@ lint.ignore = [
"E501",
]
lint.per-file-ignores."examples/**/*.py" = [ "ANN", "DTZ005", "PLC0415" ]
lint.per-file-ignores."tests/**/*.py" = [ "ANN", "PLC0415", "PLR2004" ]
lint.fixable = [ "ALL" ]
lint.pylint.allow-magic-value-types = [ "int", "str" ]
25 changes: 18 additions & 7 deletions src/datastar_py/django.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from collections.abc import Awaitable, Callable, Mapping
from functools import wraps
from inspect import isasyncgenfunction, isawaitable, iscoroutinefunction
from typing import Any, ParamSpec

from django.http import HttpRequest
Expand Down Expand Up @@ -45,20 +46,30 @@ def __init__(

def datastar_response(
func: Callable[P, Awaitable[DatastarEvents] | DatastarEvents],
) -> Callable[P, Awaitable[DatastarResponse]]:
) -> Callable[P, Awaitable[DatastarResponse] | DatastarResponse]:
"""A decorator which wraps a function result in DatastarResponse.

Can be used on a sync or async function or generator function.
Preserves the sync/async nature of the decorated function.
"""
if iscoroutinefunction(func) or isasyncgenfunction(func):

@wraps(func)
async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse:
result = func(*args, **kwargs)
if isawaitable(result):
result = await result
return DatastarResponse(result)

async_wrapper.__annotations__["return"] = DatastarResponse
return async_wrapper

@wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse:
r = func(*args, **kwargs)
if isinstance(r, Awaitable):
return DatastarResponse(await r)
return DatastarResponse(r)
def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse:
return DatastarResponse(func(*args, **kwargs))

return wrapper
sync_wrapper.__annotations__["return"] = DatastarResponse
return sync_wrapper


def read_signals(request: HttpRequest) -> dict[str, Any] | None:
Expand Down
28 changes: 19 additions & 9 deletions src/datastar_py/litestar.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from collections.abc import Awaitable, Callable, Mapping
from functools import wraps
from inspect import isasyncgenfunction, isawaitable, iscoroutinefunction
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -64,21 +65,30 @@ def __init__(

def datastar_response(
func: Callable[P, Awaitable[DatastarEvents] | DatastarEvents],
) -> Callable[P, Awaitable[DatastarResponse]]:
) -> Callable[P, Awaitable[DatastarResponse] | DatastarResponse]:
"""A decorator which wraps a function result in DatastarResponse.

Can be used on a sync or async function or generator function.
Preserves the sync/async nature of the decorated function.
"""
if iscoroutinefunction(func) or isasyncgenfunction(func):

@wraps(func)
async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse:
result = func(*args, **kwargs)
if isawaitable(result):
result = await result
return DatastarResponse(result)

async_wrapper.__annotations__["return"] = DatastarResponse
return async_wrapper

@wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse:
r = func(*args, **kwargs)
if isinstance(r, Awaitable):
return DatastarResponse(await r)
return DatastarResponse(r)

wrapper.__annotations__["return"] = DatastarResponse
return wrapper
def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse:
return DatastarResponse(func(*args, **kwargs))

sync_wrapper.__annotations__["return"] = DatastarResponse
return sync_wrapper


async def read_signals(request: Request) -> dict[str, Any] | None:
Expand Down
33 changes: 25 additions & 8 deletions src/datastar_py/quart.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from collections.abc import Awaitable, Callable, Mapping
from functools import wraps
from inspect import isasyncgen, isasyncgenfunction, isgenerator
from inspect import isasyncgen, isasyncgenfunction, iscoroutinefunction, isgenerator
from typing import Any, ParamSpec

from quart import Response, copy_current_request_context, request, stream_with_context
Expand Down Expand Up @@ -43,20 +43,37 @@ def __init__(

def datastar_response(
func: Callable[P, Awaitable[DatastarEvents] | DatastarEvents],
) -> Callable[P, Awaitable[DatastarResponse]]:
) -> Callable[P, Awaitable[DatastarResponse] | DatastarResponse]:
"""A decorator which wraps a function result in DatastarResponse.

Can be used on a sync or async function or generator function.
Preserves the sync/async nature of the decorated function.
"""
# Async generators require stream_with_context wrapping at decoration time
if isasyncgenfunction(func):

@wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse:
if isasyncgenfunction(func):
@wraps(func)
async def async_gen_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse:
return DatastarResponse(stream_with_context(func)(*args, **kwargs))
return DatastarResponse(await copy_current_request_context(func)(*args, **kwargs))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we need the copy_current_request_context anymore?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restored in ca51009. copy_current_request_context binds the handler to a copy of the current request context so quart.request / g stay accessible across the internal await. Left the sync branch alone for minimal diff.


wrapper.__annotations__["return"] = DatastarResponse
return wrapper
async_gen_wrapper.__annotations__["return"] = DatastarResponse
return async_gen_wrapper

if iscoroutinefunction(func):

@wraps(func)
async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse:
return DatastarResponse(await copy_current_request_context(func)(*args, **kwargs))

async_wrapper.__annotations__["return"] = DatastarResponse
return async_wrapper

@wraps(func)
def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse:
return DatastarResponse(func(*args, **kwargs))

sync_wrapper.__annotations__["return"] = DatastarResponse
return sync_wrapper


async def read_signals() -> dict[str, Any] | None:
Expand Down
4 changes: 2 additions & 2 deletions src/datastar_py/sanic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections.abc import Awaitable, Callable, Collection, Mapping
from contextlib import aclosing, closing
from functools import wraps
from inspect import isasyncgen, isgenerator
from inspect import isasyncgen, isawaitable, isgenerator
from typing import Any, ParamSpec

from sanic import HTTPResponse, Request
Expand Down Expand Up @@ -70,7 +70,7 @@ def datastar_response(
@wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse | None:
r = func(*args, **kwargs)
if isinstance(r, Awaitable):
if isawaitable(r):
return DatastarResponse(await r)
if isasyncgen(r):
request = args[0]
Expand Down
28 changes: 19 additions & 9 deletions src/datastar_py/starlette.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from collections.abc import Awaitable, Callable, Mapping
from functools import wraps
from inspect import isasyncgenfunction, isawaitable, iscoroutinefunction
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -53,21 +54,30 @@ def __init__(

def datastar_response(
func: Callable[P, Awaitable[DatastarEvents] | DatastarEvents],
) -> Callable[P, Awaitable[DatastarResponse]]:
) -> Callable[P, Awaitable[DatastarResponse] | DatastarResponse]:
"""A decorator which wraps a function result in DatastarResponse.

Can be used on a sync or async function or generator function.
Preserves the sync/async nature of the decorated function.
"""
if iscoroutinefunction(func) or isasyncgenfunction(func):

@wraps(func)
async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse:
result = func(*args, **kwargs)
if isawaitable(result):
result = await result
return DatastarResponse(result)

async_wrapper.__annotations__["return"] = DatastarResponse
return async_wrapper

@wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse:
r = func(*args, **kwargs)
if isinstance(r, Awaitable):
return DatastarResponse(await r)
return DatastarResponse(r)

wrapper.__annotations__["return"] = DatastarResponse
return wrapper
def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> DatastarResponse:
return DatastarResponse(func(*args, **kwargs))

sync_wrapper.__annotations__["return"] = DatastarResponse
return sync_wrapper


async def read_signals(request: Request) -> dict[str, Any] | None:
Expand Down
82 changes: 82 additions & 0 deletions tests/test_datastar_decorator_runtime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""Runtime regression test for datastar_response: sync handlers must not stall the event loop."""

from __future__ import annotations

import threading
import time
from typing import Any

import anyio
import httpx
import pytest
import uvicorn
from starlette.applications import Starlette
from starlette.responses import PlainTextResponse
from starlette.routing import Route

from datastar_py.sse import ServerSentEventGenerator as SSE


@pytest.fixture
def anyio_backend() -> str:
"""Limit anyio plugin to asyncio backend for these tests."""
return "asyncio"


async def _fetch(
client: httpx.AsyncClient, path: str, timings: dict[str, float], key: str
) -> None:
start = time.perf_counter()
resp = await client.get(path, timeout=5.0)
timings[key] = time.perf_counter() - start
resp.raise_for_status()


@pytest.mark.anyio("asyncio")
async def test_sync_handler_runs_off_event_loop() -> None:
"""Sync routes should stay in the threadpool; otherwise they block the event loop."""
entered = threading.Event()

from datastar_py.starlette import datastar_response

@datastar_response
def slow(request) -> Any:
entered.set()
time.sleep(1.0) # if run on the event loop, this blocks other requests
return SSE.patch_signals({"slow": True})

async def ping(request) -> PlainTextResponse:
return PlainTextResponse("pong")

app = Starlette(routes=[Route("/slow", slow), Route("/ping", ping)])

config = uvicorn.Config(app, host="127.0.0.1", port=0, log_level="warning", lifespan="off")
server = uvicorn.Server(config)
thread = threading.Thread(target=server.run, daemon=True)
thread.start()

try:
# Wait for server to start and expose sockets
for _ in range(50):
if server.started and getattr(server, "servers", None):
break
await anyio.sleep(0.05)
else:
pytest.fail("Server did not start")

sock = server.servers[0].sockets[0]
host, port = sock.getsockname()[:2]
base_url = f"http://{host}:{port}"

async with httpx.AsyncClient(base_url=base_url) as client:
timings: dict[str, float] = {}
async with anyio.create_task_group() as tg:
tg.start_soon(_fetch, client, "/slow", timings, "slow")
await anyio.to_thread.run_sync(entered.wait, 1.0)
tg.start_soon(_fetch, client, "/ping", timings, "ping")

assert timings["slow"] >= 0.9
assert timings["ping"] < 0.3, "Ping should not be blocked by slow sync handler"
finally:
server.should_exit = True
thread.join(timeout=2)
Loading