Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lightx2v/server/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def main():
parser.add_argument("--host", type=str, default="0.0.0.0", help="Server host")
parser.add_argument("--port", type=int, default=8000, help="Server port")
parser.add_argument("--max_queue_size", type=int, default=10, help="Maximum active tasks (pending + processing)")
parser.add_argument("--history_limit", type=int, default=1000, help="Maximum completed tasks to keep in history")

args, unknown = parser.parse_known_args()

Expand Down
12 changes: 11 additions & 1 deletion lightx2v/server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class ServerConfig:
max_queue_size: int = 10

task_timeout: int = 300
task_history_limit: int = 1000
history_limit: int = 1000

http_timeout: int = 30
http_max_retries: int = 3
Expand Down Expand Up @@ -41,6 +41,12 @@ def from_env(cls) -> "ServerConfig":
except ValueError:
logger.warning(f"Invalid max queue size: {env_queue_size}")

if env_history_limit := os.environ.get("LIGHTX2V_HISTORY_LIMIT"):
try:
config.history_limit = int(env_history_limit)
except ValueError:
logger.warning(f"Invalid task history limit: {env_history_limit}")

# MASTER_ADDR is now managed by torchrun, no need to set manually

if env_cache_dir := os.environ.get("LIGHTX2V_CACHE_DIR"):
Expand All @@ -59,6 +65,10 @@ def validate(self) -> bool:
logger.error("task_timeout must be positive")
valid = False

if self.history_limit < 0:
logger.error("history_limit must be >= 0")
valid = False

return valid


Expand Down
5 changes: 5 additions & 0 deletions lightx2v/server/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ def _signal_handler(signum, frame):
task_manager.set_max_queue_size(server_config.max_queue_size)
logger.info(f"Task queue size set to {server_config.max_queue_size}")

if hasattr(args, "history_limit") and args.history_limit is not None:
server_config.history_limit = int(args.history_limit)
task_manager.set_history_limit(server_config.history_limit)
logger.info(f"Task history limit set to {server_config.history_limit}")

if not server_config.validate():
raise RuntimeError("Invalid server configuration")

Expand Down
16 changes: 12 additions & 4 deletions lightx2v/server/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ class TaskInfo:


class TaskManager:
def __init__(self, max_queue_size: int = 100):
def __init__(self, max_queue_size: int = 100, history_limit: int = 1000):
self.max_queue_size = max_queue_size
self.history_limit = history_limit

self._tasks: OrderedDict[str, TaskInfo] = OrderedDict()
self._lock = threading.RLock()
Expand Down Expand Up @@ -94,6 +95,7 @@ def complete_task(self, task_id: str, save_result_path: Optional[str] = None, re
task.end_time = datetime.now()
task.save_result_path = save_result_path
task.result_png = result_png
task.message = None

self.completed_tasks += 1
self._emit_queue_metrics_unlocked()
Expand Down Expand Up @@ -231,15 +233,21 @@ def set_max_queue_size(self, max_queue_size: int):
self.max_queue_size = max_queue_size
self._emit_queue_metrics_unlocked()

def _cleanup_old_tasks(self, keep_count: int = 1000):
if len(self._tasks) <= keep_count:
def set_history_limit(self, history_limit: int):
if history_limit < 0:
raise ValueError("history_limit must be >= 0")
with self._lock:
self.history_limit = history_limit

def _cleanup_old_tasks(self):
if len(self._tasks) <= self.history_limit:
return

completed_tasks = [(task_id, task) for task_id, task in self._tasks.items() if task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED]]

completed_tasks.sort(key=lambda x: x[1].end_time or x[1].start_time)

remove_count = len(self._tasks) - keep_count
remove_count = len(self._tasks) - self.history_limit
for task_id, _ in completed_tasks[:remove_count]:
del self._tasks[task_id]
logger.debug(f"Cleaned up old task: {task_id}")
Expand Down
Loading