Skip to content

Commit c1c7759

Browse files
committed
feat: enque pipelines based on priority
Signed-off-by: Keshav Priyadarshi <git@keshav.space>
1 parent 1c67f3e commit c1c7759

2 files changed

Lines changed: 25 additions & 4 deletions

File tree

vulnerabilities/schedules.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,21 @@ def update_pipeline_schedule():
9595
PipelineSchedule.objects.exclude(pipeline_id__in=pipelines.keys()).delete()
9696
for id, pipeline_class in pipelines.items():
9797
run_once = getattr(pipeline_class, "run_once", False)
98+
run_interval = getattr(pipeline_class, "run_interval", 24)
99+
run_priority = getattr(
100+
pipeline_class, "run_priority", PipelineSchedule.ExecutionPriority.DEFAULT
101+
)
98102

99-
PipelineSchedule.objects.get_or_create(
103+
pipeline, created = PipelineSchedule.objects.get_or_create(
100104
pipeline_id=id,
101105
defaults={
102106
"is_run_once": run_once,
107+
"run_interval": run_interval,
108+
"run_priority": run_priority,
103109
},
104110
)
111+
112+
if not created:
113+
pipeline.run_priority = run_priority
114+
pipeline.run_interval = run_interval
115+
pipeline.save()

vulnerabilities/tasks.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,13 @@
2020

2121
logger = logging.getLogger(__name__)
2222

23-
queue = django_rq.get_queue("default")
23+
default_queue = django_rq.get_queue("default")
24+
high_queue = django_rq.get_queue("high")
25+
26+
queues = {
27+
"default": django_rq.get_queue("default"),
28+
"high": django_rq.get_queue("high"),
29+
}
2430

2531

2632
def execute_pipeline(pipeline_id, run_id):
@@ -112,6 +118,8 @@ def set_run_failure(job, connection, type, value, traceback):
112118

113119
def enqueue_pipeline(pipeline_id):
114120
pipeline_schedule = models.PipelineSchedule.objects.get(pipeline_id=pipeline_id)
121+
queue = queues.get(pipeline_schedule.get_priority_display())
122+
115123
if pipeline_schedule.status in [
116124
models.PipelineRun.Status.RUNNING,
117125
models.PipelineRun.Status.QUEUED,
@@ -139,5 +147,7 @@ def enqueue_pipeline(pipeline_id):
139147

140148
def dequeue_job(job_id):
141149
"""Remove a job from queue if it hasn't been executed yet."""
142-
if job_id in queue.jobs:
143-
queue.remove(job_id)
150+
151+
for queue in queues.values():
152+
if job_id in queue.jobs:
153+
queue.remove(job_id)

0 commit comments

Comments
 (0)