diff --git a/lightx2v/server/__main__.py b/lightx2v/server/__main__.py index 06f2d2d4c..1c848abf6 100644 --- a/lightx2v/server/__main__.py +++ b/lightx2v/server/__main__.py @@ -13,6 +13,7 @@ def main(): parser.add_argument("--host", type=str, default="0.0.0.0", help="Server host") parser.add_argument("--port", type=int, default=8000, help="Server port") parser.add_argument("--max_queue_size", type=int, default=10, help="Maximum active tasks (pending + processing)") + parser.add_argument("--history_limit", type=int, default=1000, help="Maximum completed tasks to keep in history") args, unknown = parser.parse_known_args() diff --git a/lightx2v/server/config.py b/lightx2v/server/config.py index c7bfccbf2..84e3af826 100644 --- a/lightx2v/server/config.py +++ b/lightx2v/server/config.py @@ -12,7 +12,7 @@ class ServerConfig: max_queue_size: int = 10 task_timeout: int = 300 - task_history_limit: int = 1000 + history_limit: int = 1000 http_timeout: int = 30 http_max_retries: int = 3 @@ -41,6 +41,12 @@ def from_env(cls) -> "ServerConfig": except ValueError: logger.warning(f"Invalid max queue size: {env_queue_size}") + if env_history_limit := os.environ.get("LIGHTX2V_HISTORY_LIMIT"): + try: + config.history_limit = int(env_history_limit) + except ValueError: + logger.warning(f"Invalid task history limit: {env_history_limit}") + # MASTER_ADDR is now managed by torchrun, no need to set manually if env_cache_dir := os.environ.get("LIGHTX2V_CACHE_DIR"): @@ -59,6 +65,10 @@ def validate(self) -> bool: logger.error("task_timeout must be positive") valid = False + if self.history_limit < 0: + logger.error("history_limit must be >= 0") + valid = False + return valid diff --git a/lightx2v/server/main.py b/lightx2v/server/main.py index 853628830..9b7951e69 100644 --- a/lightx2v/server/main.py +++ b/lightx2v/server/main.py @@ -47,6 +47,11 @@ def _signal_handler(signum, frame): task_manager.set_max_queue_size(server_config.max_queue_size) logger.info(f"Task queue size set to {server_config.max_queue_size}") + if hasattr(args, "history_limit") and args.history_limit is not None: + server_config.history_limit = int(args.history_limit) + task_manager.set_history_limit(server_config.history_limit) + logger.info(f"Task history limit set to {server_config.history_limit}") + if not server_config.validate(): raise RuntimeError("Invalid server configuration") diff --git a/lightx2v/server/task_manager.py b/lightx2v/server/task_manager.py index 5c53ea096..5be41c03f 100644 --- a/lightx2v/server/task_manager.py +++ b/lightx2v/server/task_manager.py @@ -35,8 +35,9 @@ class TaskInfo: class TaskManager: - def __init__(self, max_queue_size: int = 100): + def __init__(self, max_queue_size: int = 100, history_limit: int = 1000): self.max_queue_size = max_queue_size + self.history_limit = history_limit self._tasks: OrderedDict[str, TaskInfo] = OrderedDict() self._lock = threading.RLock() @@ -94,6 +95,7 @@ def complete_task(self, task_id: str, save_result_path: Optional[str] = None, re task.end_time = datetime.now() task.save_result_path = save_result_path task.result_png = result_png + task.message = None self.completed_tasks += 1 self._emit_queue_metrics_unlocked() @@ -231,15 +233,21 @@ def set_max_queue_size(self, max_queue_size: int): self.max_queue_size = max_queue_size self._emit_queue_metrics_unlocked() - def _cleanup_old_tasks(self, keep_count: int = 1000): - if len(self._tasks) <= keep_count: + def set_history_limit(self, history_limit: int): + if history_limit < 0: + raise ValueError("history_limit must be >= 0") + with self._lock: + self.history_limit = history_limit + + def _cleanup_old_tasks(self): + if len(self._tasks) <= self.history_limit: return completed_tasks = [(task_id, task) for task_id, task in self._tasks.items() if task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]] completed_tasks.sort(key=lambda x: x[1].end_time or x[1].start_time) - remove_count = len(self._tasks) - keep_count + remove_count = len(self._tasks) - self.history_limit for task_id, _ in completed_tasks[:remove_count]: del self._tasks[task_id] logger.debug(f"Cleaned up old task: {task_id}")