-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathscheduler.py
More file actions
executable file
·78 lines (59 loc) · 2.53 KB
/
scheduler.py
File metadata and controls
executable file
·78 lines (59 loc) · 2.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
"""Background scheduler — checks for due tasks and runs them automatically."""
from __future__ import annotations
import asyncio
import logging
import os
from datetime import datetime
from zoneinfo import ZoneInfo
import state
# Timezone for schedule matching (configurable via env var)
_SCHEDULER_TZ = ZoneInfo(os.getenv("SCHEDULER_TIMEZONE", "Asia/Jerusalem"))
logger = logging.getLogger(__name__)
async def scheduler_loop(check_interval: int = 60):
"""Run an infinite loop that checks for due schedules every `check_interval` seconds."""
logger.info(f"Scheduler started (checking every {check_interval}s)")
while True:
await asyncio.sleep(check_interval)
try:
await _check_due_schedules()
except Exception as e:
logger.error(f"Scheduler error: {e}", exc_info=True)
async def _check_due_schedules():
"""Check for schedules due at the current HH:MM and trigger them."""
if not state.session_mgr:
return
now = datetime.now(tz=_SCHEDULER_TZ)
current_time = now.strftime("%H:%M")
due = await state.session_mgr.get_due_schedules(current_time)
if not due:
return
logger.info(f"Scheduler: {len(due)} schedule(s) due at {current_time}")
for schedule in due:
schedule_id = schedule["id"]
project_id = schedule["project_id"]
# user_id available in schedule["user_id"] if needed
task_desc = schedule["task_description"]
repeat = schedule.get("repeat", "once")
# Find or create manager
manager, _ = await state.get_manager_safe(project_id)
if not manager:
logger.warning(
f"Scheduler: no manager for project {project_id}, skipping schedule {schedule_id}"
)
continue
logger.info(
f"Scheduler: triggering schedule {schedule_id} for project {project_id}: {task_desc[:80]}"
)
try:
if not manager.is_running:
await manager.start_session(task_desc)
else:
await manager.inject_user_message("orchestrator", task_desc)
# Mark as run
await state.session_mgr.mark_schedule_run(schedule_id)
# Disable one-time schedules
if repeat == "once":
await state.session_mgr.disable_schedule(schedule_id)
logger.info(f"Scheduler: disabled one-time schedule {schedule_id}")
except Exception as e:
logger.error(f"Scheduler: failed to trigger schedule {schedule_id}: {e}", exc_info=True)