11import asyncio
2+ import datetime
23import glob
34import inspect
45import json
1415from typing import Optional
1516
1617import pkg_resources
17- from fastapi import FastAPI , Form , File , Request , Depends , BackgroundTasks , HTTPException
18+ from fastapi import FastAPI , Form , File , Request , Depends , HTTPException
1819from fastapi .responses import HTMLResponse , FileResponse
1920from fastapi .responses import RedirectResponse , JSONResponse
2021from fastapi .templating import Jinja2Templates
2122from fastapi .websockets import WebSocket
2223from pydantic import BaseModel , Field , ValidationError
2324from websockets .exceptions import ConnectionClosed
2425
25- THREADS = int (os .environ .get ('CUSTOMIZER_THREADS' , 10 ))
26+ THREADS = int (os .environ .get ('CUSTOMIZER_THREADS' , 1 ))
2627TIMEOUT = int (os .environ .get ('CUSTOMIZER_JOB_TIMEOUT' , 1200 ))
28+ QUEUESIZE = int (os .environ .get ('CUSTOMIZER_QUEUE_LENGTH' , 20 ))
29+ TEMPLATEDIR = pkg_resources .resource_filename (__name__ , 'templates' )
2730
2831# TODO: make this configurable
2932ALL_PARTS = [
4548 "MainCase/MainCaseLid" ,
4649 "MainCase/UsbCover" ,
4750]
51+
4852ALL_PARTS .extend (
4953 [f"logo/OpenBikeSensor/MainCase{ l } -{ inv } -{ mn } "
5054 for l in ["" , "Lid" ]
5155 for inv in ["inverted" , "normal" ]
5256 for mn in ["main" , "highlight" ]])
5357
54- #ALL_PARTS = ALL_PARTS[:3] # for debugging
58+ # ALL_PARTS = ALL_PARTS[:3] # for debugging
5559
5660queue = asyncio .Queue (maxsize = 20 )
5761app = FastAPI ()
58- TEMPLATEDIR = pkg_resources .resource_filename (__name__ , 'templates' )
5962
6063templates = Jinja2Templates (directory = TEMPLATEDIR )
6164
65+ job_durations = [200 , 200 , 200 ]
66+
6267
6368def field_type (entry , default_value ):
6469 if entry in ["main_case_logo_svg" , "main_case_lid_logo_svg" ]:
@@ -82,6 +87,19 @@ def models(root):
8287MODEL_ROOT = (ROOT / "src" ).resolve ()
8388
8489
90+ async def queue_runner ():
91+ while True :
92+ fun , arg = await queue .get ()
93+ logging .info (f"running { arg } " )
94+ await fun (arg )
95+
96+
97+ @app .on_event ('startup' )
98+ async def app_startup ():
99+ asyncio .create_task (queue_runner ())
100+ asyncio .create_task (queue_runner ())
101+
102+
85103def scadify (value ):
86104 if isinstance (value , bool ):
87105 return str (value ).lower ()
@@ -156,6 +174,9 @@ def package_to_zip(source: Path, target: Path):
156174
157175
158176async def run_job (uid , parts = None ):
177+ global job_durations
178+ starttime = datetime .datetime .now ()
179+
159180 if parts == None :
160181 parts = ALL_PARTS
161182 dir_to_work = Path (tempfile .gettempdir ()) / uid
@@ -210,6 +231,8 @@ async def run_job(uid, parts=None):
210231 write_info_json ()
211232
212233 raise
234+ job_durations = job_durations [- 2 :]
235+ job_durations .append ((datetime .datetime .now () - starttime ).total_seconds ())
213236
214237
215238async def copy_build_files_to_build_dir (dir_to_work : Path , build_dir : Path , use_custom_logo : bool ):
@@ -342,7 +365,6 @@ async def jobstate(websocket: WebSocket, uid: uuid.UUID):
342365
343366@app .post ("/job" )
344367async def form_post (request : Request ,
345- background_tasks : BackgroundTasks ,
346368 main_case_logo_svg : Optional [bytes ] = File (None ),
347369 main_case_lid_logo_svg : Optional [bytes ] = File (None ),
348370 variables : CustomVariables = Depends (CustomVariables .as_form )):
@@ -361,8 +383,17 @@ async def form_post(request: Request,
361383 if len (main_case_logo_svg ) == 0 and len (main_case_lid_logo_svg ) == 0 :
362384 variables .use_custom_logo = False
363385 variables_json_file = work_dir / "variables.json"
386+
387+ open (work_dir / "log.txt" , "w" ).write (
388+ f"job queued, approx wait time to start { (sum (job_durations ) / len (job_durations )) * max (1 , queue .qsize () - 2 )} seconds\n " )
389+ open (work_dir / "info.json" , "w" ).write (json .dumps ({"parts" : ["none" ], "status" : "queued" , }))
390+
364391 variables_json_file .open ("w" ).write (variables .json ())
365- background_tasks .add_task (run_job , uid )
392+
393+ try :
394+ queue .put_nowait ((run_job , uid ))
395+ except asyncio .QueueFull :
396+ return HTTPException (status_code = 503 , detail = "Queue is full, please try again later" )
366397
367398 if "Accept" in request .headers and request .headers ["Accept" ] == "application/json" :
368399 return JSONResponse (RunningJob (uid = uid , ** variables .dict ()).dict ())
0 commit comments