Skip to content

Commit 93cb8be

Browse files
committed
Limit worker pool to 10 processes with queued request handling
- Default VFBQUERY_WORKERS from CPU count to 10, limiting concurrent Neo4j connections and reducing 502 Bad Gateway bursts - Default VFBQUERY_MAX_CONCURRENT from workers*4 to workers*2 - Add QueueTracker with active/waiting/total_served counters - Log queue depth (active, waiting) on each incoming request - Add GET /status endpoint for live queue monitoring - Requests exceeding the pool size are queued and held open until a worker becomes available — no request is dropped
1 parent f5f94e0 commit 93cb8be

2 files changed

Lines changed: 104 additions & 15 deletions

File tree

Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ EXPOSE 8080
1818
# Tuning via env vars:
1919
# VFBQUERY_PORT (default 8080)
2020
# VFBQUERY_HOST (default 0.0.0.0)
21-
# VFBQUERY_WORKERS (default: CPU count)
22-
# VFBQUERY_MAX_CONCURRENT (default: workers × 4)
21+
# VFBQUERY_WORKERS (default: 10)
22+
# VFBQUERY_MAX_CONCURRENT (default: workers × 2)
2323

2424
ENTRYPOINT ["python", "-m", "vfbquery.ha_api"]

src/vfbquery/ha_api.py

Lines changed: 102 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,21 @@
22
VFBquery High-Availability API Server
33
44
Drop-in replacement for the V3 backend that serves VFBquery results
5-
over HTTP. Designed to handle hundreds of simultaneous long-running
6-
queries without timing out.
5+
over HTTP. Uses a bounded process pool (default: 10 workers) so that
6+
Neo4j is never hit by more than N simultaneous connections. Incoming
7+
requests that exceed the pool size are queued and held open until a
8+
worker becomes available.
79
810
Endpoints (mirrors v3-cached.virtualflybrain.org):
911
GET /get_term_info?id=<short_form>
1012
GET /run_query?id=<short_form>&query_type=<QueryType>
1113
GET /health
14+
GET /status — queue depth & worker utilisation
1215
1316
Usage:
14-
python -m vfbquery.ha_api # default: port 8080
17+
python -m vfbquery.ha_api # default: port 8080, 10 workers
1518
python -m vfbquery.ha_api --port 8080 --workers 8
16-
VFBQUERY_WORKERS=16 python -m vfbquery.ha_api
19+
VFBQUERY_WORKERS=10 python -m vfbquery.ha_api
1720
"""
1821

1922
import argparse
@@ -39,6 +42,48 @@
3942
)
4043
log = logging.getLogger("vfbquery.ha_api")
4144

45+
# Default number of worker processes — deliberately low to limit the number
46+
# of concurrent Neo4j connections. Override with VFBQUERY_WORKERS env var
47+
# or --workers CLI flag.
48+
DEFAULT_WORKERS = 10
49+
50+
51+
# ---------------------------------------------------------------------------
52+
# Queue tracker — keeps an atomic count of active + waiting requests so the
53+
# /status endpoint and log lines can report backpressure.
54+
# ---------------------------------------------------------------------------
55+
56+
class QueueTracker:
57+
"""Lightweight counters for in-flight and waiting requests."""
58+
59+
def __init__(self):
60+
self._active = 0
61+
self._waiting = 0
62+
self._total_served = 0
63+
self._lock = asyncio.Lock()
64+
65+
async def enter_queue(self):
66+
async with self._lock:
67+
self._waiting += 1
68+
69+
async def leave_queue_start_work(self):
70+
async with self._lock:
71+
self._waiting -= 1
72+
self._active += 1
73+
74+
async def finish_work(self):
75+
async with self._lock:
76+
self._active -= 1
77+
self._total_served += 1
78+
79+
@property
80+
def snapshot(self):
81+
return {
82+
"active": self._active,
83+
"waiting": self._waiting,
84+
"total_served": self._total_served,
85+
}
86+
4287
# ---------------------------------------------------------------------------
4388
# Query-type → VFBquery function mapping
4489
#
@@ -172,10 +217,17 @@ async def handle_get_term_info(request):
172217

173218
pool = request.app["pool"]
174219
sem = request.app["semaphore"]
220+
tracker = request.app["tracker"]
175221

176-
log.info("get_term_info id=%s — queued", short_form)
222+
await tracker.enter_queue()
223+
snap = tracker.snapshot
224+
log.info(
225+
"get_term_info id=%s — queued (active=%d waiting=%d)",
226+
short_form, snap["active"], snap["waiting"],
227+
)
177228
try:
178229
async with sem:
230+
await tracker.leave_queue_start_work()
179231
loop = asyncio.get_event_loop()
180232
result = await loop.run_in_executor(pool, _run_term_info, short_form)
181233
log.info("get_term_info id=%s — done", short_form)
@@ -187,6 +239,8 @@ async def handle_get_term_info(request):
187239
{"error": f"Query failed for id={short_form}", "detail": tb},
188240
status=500,
189241
)
242+
finally:
243+
await tracker.finish_work()
190244

191245

192246
async def handle_run_query(request):
@@ -215,10 +269,17 @@ async def handle_run_query(request):
215269

216270
pool = request.app["pool"]
217271
sem = request.app["semaphore"]
272+
tracker = request.app["tracker"]
218273

219-
log.info("run_query id=%s query_type=%s — queued", short_form, query_type)
274+
await tracker.enter_queue()
275+
snap = tracker.snapshot
276+
log.info(
277+
"run_query id=%s query_type=%s — queued (active=%d waiting=%d)",
278+
short_form, query_type, snap["active"], snap["waiting"],
279+
)
220280
try:
221281
async with sem:
282+
await tracker.leave_queue_start_work()
222283
loop = asyncio.get_event_loop()
223284
result = await loop.run_in_executor(
224285
pool, _run_query, short_form, func_name
@@ -236,13 +297,31 @@ async def handle_run_query(request):
236297
"detail": tb},
237298
status=500,
238299
)
300+
finally:
301+
await tracker.finish_work()
239302

240303

241304
async def handle_health(request):
242305
"""GET /health — lightweight liveness probe for upstream nginx."""
243306
return web.json_response({"status": "ok"})
244307

245308

309+
async def handle_status(request):
310+
"""GET /status — queue depth and worker utilisation."""
311+
tracker = request.app["tracker"]
312+
snap = tracker.snapshot
313+
max_workers = request.app["max_workers"]
314+
max_concurrent = request.app["max_concurrent"]
315+
return web.json_response({
316+
"status": "ok",
317+
"workers": max_workers,
318+
"max_concurrent": max_concurrent,
319+
"active": snap["active"],
320+
"waiting": snap["waiting"],
321+
"total_served": snap["total_served"],
322+
})
323+
324+
246325
# ---------------------------------------------------------------------------
247326
# Application factory
248327
# ---------------------------------------------------------------------------
@@ -252,20 +331,29 @@ def create_app(max_workers=None, max_concurrent=None):
252331
Build the aiohttp Application.
253332
254333
Args:
255-
max_workers: number of OS processes in the pool (default: CPU count)
256-
max_concurrent: max queries executing at once (default: workers × 4)
334+
max_workers: number of OS processes in the pool (default: 10)
335+
max_concurrent: max queries executing at once (default: workers × 2)
336+
337+
Requests beyond max_concurrent are queued in the async event loop
338+
and held open until a worker becomes available — no request is
339+
dropped or timed out by the queue itself.
257340
"""
258341
if max_workers is None:
259-
max_workers = int(os.getenv("VFBQUERY_WORKERS", os.cpu_count() or 4))
342+
max_workers = int(os.getenv("VFBQUERY_WORKERS", DEFAULT_WORKERS))
260343
if max_concurrent is None:
261-
max_concurrent = int(os.getenv("VFBQUERY_MAX_CONCURRENT", max_workers * 4))
344+
max_concurrent = int(os.getenv("VFBQUERY_MAX_CONCURRENT", max_workers * 2))
262345

263346
app = web.Application()
264347

265348
# Routes
266349
app.router.add_get("/get_term_info", handle_get_term_info)
267350
app.router.add_get("/run_query", handle_run_query)
268351
app.router.add_get("/health", handle_health)
352+
app.router.add_get("/status", handle_status)
353+
354+
# Store config for /status
355+
app["max_workers"] = max_workers
356+
app["max_concurrent"] = max_concurrent
269357

270358
async def on_startup(app):
271359
log.info(
@@ -276,6 +364,7 @@ async def on_startup(app):
276364
max_workers=max_workers, initializer=_init_worker
277365
)
278366
app["semaphore"] = asyncio.Semaphore(max_concurrent)
367+
app["tracker"] = QueueTracker()
279368

280369
async def on_cleanup(app):
281370
app["pool"].shutdown(wait=False)
@@ -305,13 +394,13 @@ def main():
305394
)
306395
parser.add_argument(
307396
"--workers", type=int,
308-
default=int(os.getenv("VFBQUERY_WORKERS", os.cpu_count() or 4)),
309-
help="Number of worker processes (default: CPU count)",
397+
default=int(os.getenv("VFBQUERY_WORKERS", DEFAULT_WORKERS)),
398+
help=f"Number of worker processes (default: {DEFAULT_WORKERS})",
310399
)
311400
parser.add_argument(
312401
"--max-concurrent", type=int,
313402
default=None,
314-
help="Max concurrent queries (default: workers × 4)",
403+
help="Max concurrent queries (default: workers × 2)",
315404
)
316405
args = parser.parse_args()
317406

0 commit comments

Comments
 (0)