Skip to content

Commit 03f2c5f

Browse files
authored
Add count_unclaimed_ready_tasks function (#84)
We can use this to monitor our running autopilot queue, and alarm if a large number of tasks are persistently unclaimed
1 parent 2bfe262 commit 03f2c5f

4 files changed

Lines changed: 292 additions & 0 deletions

File tree

sql/schema.sql

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,33 @@ begin
611611
end;
612612
$$;
613613

614+
-- Counts unclaimed runs that are ready to be claimed.
615+
-- Uses the same candidate logic as claim_task but without locking or updating.
616+
create function durable.count_unclaimed_ready_tasks (
617+
p_queue_name text
618+
)
619+
returns bigint
620+
language plpgsql
621+
as $$
622+
declare
623+
v_now timestamptz := durable.current_time();
624+
v_count bigint;
625+
begin
626+
execute format(
627+
'select count(*)
628+
from durable.%1$I r
629+
join durable.%2$I t on t.task_id = r.task_id
630+
where r.state in (''pending'', ''sleeping'')
631+
and t.state in (''pending'', ''sleeping'', ''running'')
632+
and r.available_at <= $1',
633+
'r_' || p_queue_name,
634+
't_' || p_queue_name
635+
) into v_count using v_now;
636+
637+
return v_count;
638+
end;
639+
$$;
640+
614641
-- Marks a run as completed
615642
create function durable.complete_run (
616643
p_queue_name text,

src/client.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,22 @@ where
721721
Ok(())
722722
}
723723

724+
/// Count unclaimed tasks that are ready to be claimed in a queue.
725+
///
726+
/// All of these tasks can be claimed by a running worker on the provided queue.
727+
pub async fn count_unclaimed_ready_tasks(
728+
&self,
729+
queue_name: Option<&str>,
730+
) -> DurableResult<i64> {
731+
let queue = queue_name.unwrap_or(&self.queue_name);
732+
let query = "SELECT durable.count_unclaimed_ready_tasks($1)";
733+
let (count,): (i64,) = sqlx::query_as(query)
734+
.bind(queue)
735+
.fetch_one(&self.pool)
736+
.await?;
737+
Ok(count)
738+
}
739+
724740
/// Cancel a task by ID. Running tasks will be cancelled at
725741
/// their next checkpoint or heartbeat.
726742
pub async fn cancel_task(&self, task_id: Uuid, queue_name: Option<&str>) -> DurableResult<()> {
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
-- Add function to count unclaimed runs that are ready to be claimed.
2+
-- Uses the same candidate logic as claim_task but without locking or updating.
3+
create or replace function durable.count_unclaimed_ready_tasks (
4+
p_queue_name text
5+
)
6+
returns bigint
7+
language plpgsql
8+
as $$
9+
declare
10+
v_now timestamptz := durable.current_time();
11+
v_count bigint;
12+
begin
13+
execute format(
14+
'select count(*)
15+
from durable.%1$I r
16+
join durable.%2$I t on t.task_id = r.task_id
17+
where r.state in (''pending'', ''sleeping'')
18+
and t.state in (''pending'', ''sleeping'', ''running'')
19+
and r.available_at <= $1',
20+
'r_' || p_queue_name,
21+
't_' || p_queue_name
22+
) into v_count using v_now;
23+
24+
return v_count;
25+
end;
26+
$$;

tests/count_unclaimed_test.rs

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
2+
3+
mod common;
4+
5+
use common::tasks::{EchoParams, EchoTask};
6+
use durable::{Durable, DurableBuilder, MIGRATOR, WorkerOptions};
7+
use sqlx::{AssertSqlSafe, PgPool};
8+
use std::time::Duration;
9+
10+
/// Helper to create a DurableBuilder from the test pool.
11+
fn create_client(pool: PgPool, queue_name: &str) -> DurableBuilder {
12+
Durable::builder().pool(pool).queue_name(queue_name)
13+
}
14+
15+
// ============================================================================
16+
// count_unclaimed_ready_tasks Tests
17+
// ============================================================================
18+
19+
#[sqlx::test(migrator = "MIGRATOR")]
20+
async fn test_count_unclaimed_empty_queue(pool: PgPool) -> sqlx::Result<()> {
21+
let client = create_client(pool.clone(), "count_empty")
22+
.register::<EchoTask>()
23+
.unwrap()
24+
.build()
25+
.await
26+
.unwrap();
27+
client.create_queue(None).await.unwrap();
28+
29+
let count = client
30+
.count_unclaimed_ready_tasks(None)
31+
.await
32+
.expect("Failed to count unclaimed tasks");
33+
assert_eq!(count, 0, "Empty queue should have 0 unclaimed tasks");
34+
35+
Ok(())
36+
}
37+
38+
#[sqlx::test(migrator = "MIGRATOR")]
39+
async fn test_count_unclaimed_after_spawning(pool: PgPool) -> sqlx::Result<()> {
40+
let client = create_client(pool.clone(), "count_spawn")
41+
.register::<EchoTask>()
42+
.unwrap()
43+
.build()
44+
.await
45+
.unwrap();
46+
client.create_queue(None).await.unwrap();
47+
48+
// Spawn 3 tasks
49+
for i in 0..3 {
50+
client
51+
.spawn::<EchoTask>(EchoParams {
52+
message: format!("task {i}"),
53+
})
54+
.await
55+
.expect("Failed to spawn task");
56+
}
57+
58+
let count = client
59+
.count_unclaimed_ready_tasks(None)
60+
.await
61+
.expect("Failed to count unclaimed tasks");
62+
assert_eq!(count, 3, "Should have 3 unclaimed tasks after spawning 3");
63+
64+
Ok(())
65+
}
66+
67+
#[sqlx::test(migrator = "MIGRATOR")]
68+
async fn test_count_unclaimed_decreases_after_claim(pool: PgPool) -> sqlx::Result<()> {
69+
let client = create_client(pool.clone(), "count_claim")
70+
.register::<EchoTask>()
71+
.unwrap()
72+
.build()
73+
.await
74+
.unwrap();
75+
client.create_queue(None).await.unwrap();
76+
77+
// Spawn 3 tasks
78+
for i in 0..3 {
79+
client
80+
.spawn::<EchoTask>(EchoParams {
81+
message: format!("task {i}"),
82+
})
83+
.await
84+
.expect("Failed to spawn task");
85+
}
86+
87+
assert_eq!(
88+
client.count_unclaimed_ready_tasks(None).await.unwrap(),
89+
3,
90+
"Should start with 3 unclaimed"
91+
);
92+
93+
// Start a worker that will claim and complete tasks
94+
let worker = client
95+
.start_worker(WorkerOptions {
96+
concurrency: 3,
97+
poll_interval: Duration::from_millis(100),
98+
..Default::default()
99+
})
100+
.await
101+
.unwrap();
102+
103+
// Wait for the worker to process all tasks
104+
tokio::time::sleep(Duration::from_secs(2)).await;
105+
106+
let count = client
107+
.count_unclaimed_ready_tasks(None)
108+
.await
109+
.expect("Failed to count unclaimed tasks");
110+
assert_eq!(
111+
count, 0,
112+
"Should have 0 unclaimed tasks after worker claims them"
113+
);
114+
115+
worker.shutdown().await;
116+
117+
Ok(())
118+
}
119+
120+
#[sqlx::test(migrator = "MIGRATOR")]
121+
async fn test_count_unclaimed_with_explicit_queue_name(pool: PgPool) -> sqlx::Result<()> {
122+
let client = create_client(pool.clone(), "default")
123+
.register::<EchoTask>()
124+
.unwrap()
125+
.build()
126+
.await
127+
.unwrap();
128+
129+
// Create two queues
130+
client.create_queue(Some("queue_a")).await.unwrap();
131+
client.create_queue(Some("queue_b")).await.unwrap();
132+
133+
// Spawn tasks into queue_a
134+
let client_a = create_client(pool.clone(), "queue_a")
135+
.register::<EchoTask>()
136+
.unwrap()
137+
.build()
138+
.await
139+
.unwrap();
140+
for i in 0..2 {
141+
client_a
142+
.spawn::<EchoTask>(EchoParams {
143+
message: format!("a-{i}"),
144+
})
145+
.await
146+
.unwrap();
147+
}
148+
149+
// Spawn tasks into queue_b
150+
let client_b = create_client(pool.clone(), "queue_b")
151+
.register::<EchoTask>()
152+
.unwrap()
153+
.build()
154+
.await
155+
.unwrap();
156+
for i in 0..5 {
157+
client_b
158+
.spawn::<EchoTask>(EchoParams {
159+
message: format!("b-{i}"),
160+
})
161+
.await
162+
.unwrap();
163+
}
164+
165+
// Count using explicit queue names
166+
let count_a = client
167+
.count_unclaimed_ready_tasks(Some("queue_a"))
168+
.await
169+
.unwrap();
170+
let count_b = client
171+
.count_unclaimed_ready_tasks(Some("queue_b"))
172+
.await
173+
.unwrap();
174+
175+
assert_eq!(count_a, 2, "queue_a should have 2 unclaimed tasks");
176+
assert_eq!(count_b, 5, "queue_b should have 5 unclaimed tasks");
177+
178+
Ok(())
179+
}
180+
181+
#[sqlx::test(migrator = "MIGRATOR")]
182+
async fn test_count_unclaimed_excludes_future_tasks(pool: PgPool) -> sqlx::Result<()> {
183+
let client = create_client(pool.clone(), "count_future")
184+
.register::<EchoTask>()
185+
.unwrap()
186+
.build()
187+
.await
188+
.unwrap();
189+
client.create_queue(None).await.unwrap();
190+
191+
// Spawn two tasks (both ready now)
192+
client
193+
.spawn::<EchoTask>(EchoParams {
194+
message: "ready now".to_string(),
195+
})
196+
.await
197+
.unwrap();
198+
let delayed = client
199+
.spawn::<EchoTask>(EchoParams {
200+
message: "will be delayed".to_string(),
201+
})
202+
.await
203+
.unwrap();
204+
205+
// Push one run's available_at into the future via direct SQL
206+
sqlx::query(AssertSqlSafe(
207+
"UPDATE durable.r_count_future SET available_at = now() + interval '1 hour' WHERE task_id = $1".to_string()
208+
))
209+
.bind(delayed.task_id)
210+
.execute(&pool)
211+
.await?;
212+
213+
let count = client
214+
.count_unclaimed_ready_tasks(None)
215+
.await
216+
.expect("Failed to count unclaimed tasks");
217+
assert_eq!(
218+
count, 1,
219+
"Should only count the immediately-ready task, not the delayed one"
220+
);
221+
222+
Ok(())
223+
}

0 commit comments

Comments
 (0)