Skip to content

Commit 4fb0ba8

Browse files
Add idempotency key support for task deduplication
Two new fields on SpawnOptions: - `only_once: bool` — auto-derives key from hash(task_name, params) - `idempotency_key: Option<String>` — explicit key (takes precedence) When a key is set, spawn_task checks for an existing non-terminal task with the same key. If found, returns the existing task instead of creating a duplicate. This enables first-served semantics where multiple clients can safely try to spawn the same logical task. DB changes: - New nullable `idempotency_key` column on task tables - Partial unique index on idempotency_key WHERE NOT NULL - Migration adds column/index to all existing queues - ensure_queue_tables updated for new queues Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 06dece0 commit 4fb0ba8

3 files changed

Lines changed: 274 additions & 0 deletions

File tree

src/client.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ struct SpawnOptionsDb<'a> {
2727
cancellation: Option<CancellationPolicyDb>,
2828
#[serde(skip_serializing_if = "Option::is_none")]
2929
parent_task_id: Option<&'a Uuid>,
30+
#[serde(skip_serializing_if = "std::ops::Not::not")]
31+
only_once: bool,
32+
#[serde(skip_serializing_if = "Option::is_none")]
33+
idempotency_key: Option<&'a str>,
3034
}
3135

3236
/// Internal struct for serializing cancellation policy (only non-None fields).
@@ -597,6 +601,8 @@ where
597601
.as_ref()
598602
.and_then(CancellationPolicyDb::from_policy),
599603
parent_task_id: options.parent_task_id.as_ref(),
604+
only_once: options.only_once,
605+
idempotency_key: options.idempotency_key.as_deref(),
600606
};
601607
serde_json::to_value(db_options)
602608
}
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
-- Add idempotency key support for task deduplication.
2+
-- When set, only the first spawn with a given key creates a task.
3+
-- Subsequent spawns with the same key (for non-terminal tasks) are no-ops.
4+
5+
-- 1. Add column and index to all existing queue task tables
6+
do $$
7+
declare
8+
q record;
9+
begin
10+
for q in select queue_name from durable.queues loop
11+
execute format(
12+
'alter table durable.%I add column if not exists idempotency_key text',
13+
't_' || q.queue_name
14+
);
15+
execute format(
16+
'create unique index if not exists %I on durable.%I (idempotency_key) where idempotency_key is not null',
17+
('t_' || q.queue_name) || '_ik',
18+
't_' || q.queue_name
19+
);
20+
end loop;
21+
end;
22+
$$;
23+
24+
-- 2. Update ensure_queue_tables so new queues also get the column + index
25+
create or replace function durable.ensure_queue_tables (p_queue_name text)
26+
returns void
27+
language plpgsql
28+
as $$
29+
begin
30+
execute format(
31+
'create table if not exists durable.%I (
32+
task_id uuid primary key,
33+
task_name text not null,
34+
params jsonb not null,
35+
headers jsonb,
36+
retry_strategy jsonb,
37+
max_attempts integer,
38+
cancellation jsonb,
39+
parent_task_id uuid,
40+
idempotency_key text,
41+
enqueue_at timestamptz not null default durable.current_time(),
42+
first_started_at timestamptz,
43+
state text not null check (state in (''pending'', ''running'', ''sleeping'', ''completed'', ''failed'', ''cancelled'')),
44+
attempts integer not null default 0,
45+
last_attempt_run uuid,
46+
completed_payload jsonb,
47+
cancelled_at timestamptz
48+
) with (fillfactor=70)',
49+
't_' || p_queue_name
50+
);
51+
52+
-- Idempotency might be added after the table was created; handle both cases
53+
execute format(
54+
'alter table durable.%I add column if not exists idempotency_key text',
55+
't_' || p_queue_name
56+
);
57+
58+
execute format('comment on column durable.%I.params is %L', 't_' || p_queue_name, 'User-defined. Task input parameters. Schema depends on Task::Params type.');
59+
execute format('comment on column durable.%I.headers is %L', 't_' || p_queue_name, 'User-defined. Optional key-value metadata as {"key": <any JSON value>}.');
60+
execute format('comment on column durable.%I.retry_strategy is %L', 't_' || p_queue_name, '{"kind": "none"} | {"kind": "fixed", "base_seconds": <u64>} | {"kind": "exponential", "base_seconds": <u64>, "factor": <f64>, "max_seconds": <u64>}');
61+
execute format('comment on column durable.%I.cancellation is %L', 't_' || p_queue_name, '{"max_delay": <seconds>, "max_duration": <seconds>} - both optional. max_delay: cancel if not started within N seconds of enqueue. max_duration: cancel if not completed within N seconds of first start.');
62+
execute format('comment on column durable.%I.idempotency_key is %L', 't_' || p_queue_name, 'Optional dedup key. When set, only one non-terminal task with this key can exist. Set via SpawnOptions.only_once or SpawnOptions.idempotency_key.');
63+
execute format('comment on column durable.%I.completed_payload is %L', 't_' || p_queue_name, 'User-defined. Task return value. Schema depends on Task::Output type.');
64+
65+
execute format(
66+
'create table if not exists durable.%I (
67+
run_id uuid primary key,
68+
task_id uuid not null,
69+
attempt integer not null,
70+
state text not null check (state in (''pending'', ''running'', ''sleeping'', ''completed'', ''failed'', ''cancelled'')),
71+
claimed_by text,
72+
claim_expires_at timestamptz,
73+
available_at timestamptz not null,
74+
wake_event text,
75+
event_payload jsonb,
76+
started_at timestamptz,
77+
completed_at timestamptz,
78+
failed_at timestamptz,
79+
result jsonb,
80+
failure_reason jsonb,
81+
created_at timestamptz not null default durable.current_time()
82+
) with (fillfactor=70)',
83+
'r_' || p_queue_name
84+
);
85+
86+
execute format('comment on column durable.%I.wake_event is %L', 'r_' || p_queue_name, 'Event name this run is waiting for while sleeping. Set by await_event when suspending, cleared when the event fires or timeout expires.');
87+
execute format('comment on column durable.%I.event_payload is %L', 'r_' || p_queue_name, 'Payload delivered by emit_event when waking this run. Consumed by await_event on the next claim to return the value to the caller.');
88+
execute format('comment on column durable.%I.result is %L', 'r_' || p_queue_name, 'User-defined. Serialized task output. Schema depends on Task::Output type.');
89+
execute format('comment on column durable.%I.failure_reason is %L', 'r_' || p_queue_name, '{"name": "<error type>", "message": "<string>", "backtrace": "<string>"}');
90+
91+
execute format(
92+
'create table if not exists durable.%I (
93+
task_id uuid not null,
94+
checkpoint_name text not null,
95+
state jsonb,
96+
owner_run_id uuid,
97+
updated_at timestamptz not null default durable.current_time(),
98+
primary key (task_id, checkpoint_name)
99+
) with (fillfactor=70)',
100+
'c_' || p_queue_name
101+
);
102+
103+
execute format('comment on column durable.%I.state is %L', 'c_' || p_queue_name, 'User-defined. Checkpoint value from ctx.step(). Any JSON-serializable value.');
104+
105+
execute format(
106+
'create table if not exists durable.%I (
107+
event_name text primary key,
108+
payload jsonb,
109+
emitted_at timestamptz not null default durable.current_time()
110+
)',
111+
'e_' || p_queue_name
112+
);
113+
114+
execute format('comment on column durable.%I.payload is %L', 'e_' || p_queue_name, 'User-defined. Event payload. Internal child events use: {"status": "completed"|"failed"|"cancelled", "result"?: <json>, "error"?: <json>}');
115+
116+
execute format(
117+
'create table if not exists durable.%I (
118+
task_id uuid not null,
119+
run_id uuid not null,
120+
step_name text not null,
121+
event_name text not null,
122+
timeout_at timestamptz,
123+
created_at timestamptz not null default durable.current_time(),
124+
primary key (run_id, step_name)
125+
)',
126+
'w_' || p_queue_name
127+
);
128+
129+
execute format(
130+
'create index if not exists %I on durable.%I (state, available_at)',
131+
('r_' || p_queue_name) || '_sai',
132+
'r_' || p_queue_name
133+
);
134+
135+
execute format(
136+
'create index if not exists %I on durable.%I (task_id)',
137+
('r_' || p_queue_name) || '_ti',
138+
'r_' || p_queue_name
139+
);
140+
141+
execute format(
142+
'create index if not exists %I on durable.%I (event_name)',
143+
('w_' || p_queue_name) || '_eni',
144+
'w_' || p_queue_name
145+
);
146+
147+
execute format(
148+
'create index if not exists %I on durable.%I (parent_task_id) where parent_task_id is not null',
149+
('t_' || p_queue_name) || '_pti',
150+
't_' || p_queue_name
151+
);
152+
153+
-- Idempotency key unique index (partial: only non-null keys)
154+
execute format(
155+
'create unique index if not exists %I on durable.%I (idempotency_key) where idempotency_key is not null',
156+
('t_' || p_queue_name) || '_ik',
157+
't_' || p_queue_name
158+
);
159+
end;
160+
$$;
161+
162+
-- 3. Update spawn_task to handle idempotency key
163+
create or replace function durable.spawn_task (
164+
p_queue_name text,
165+
p_task_name text,
166+
p_params jsonb,
167+
p_options jsonb default '{}'::jsonb
168+
)
169+
returns table (
170+
task_id uuid,
171+
run_id uuid,
172+
attempt integer
173+
)
174+
language plpgsql
175+
as $$
176+
declare
177+
v_task_id uuid := durable.portable_uuidv7();
178+
v_run_id uuid := durable.portable_uuidv7();
179+
v_attempt integer := 1;
180+
v_headers jsonb;
181+
v_retry_strategy jsonb;
182+
v_max_attempts integer;
183+
v_cancellation jsonb;
184+
v_parent_task_id uuid;
185+
v_idempotency_key text;
186+
v_now timestamptz := durable.current_time();
187+
v_params jsonb := coalesce(p_params, 'null'::jsonb);
188+
v_existing_task_id uuid;
189+
begin
190+
if p_task_name is null or length(trim(p_task_name)) = 0 then
191+
raise exception 'task_name must be provided';
192+
end if;
193+
194+
if p_options is not null then
195+
v_headers := p_options->'headers';
196+
v_retry_strategy := p_options->'retry_strategy';
197+
if p_options ? 'max_attempts' then
198+
v_max_attempts := (p_options->>'max_attempts')::int;
199+
if v_max_attempts is not null and v_max_attempts < 1 then
200+
raise exception 'max_attempts must be >= 1';
201+
end if;
202+
end if;
203+
v_cancellation := p_options->'cancellation';
204+
v_parent_task_id := (p_options->>'parent_task_id')::uuid;
205+
206+
-- Resolve idempotency key: explicit key takes precedence over only_once
207+
v_idempotency_key := p_options->>'idempotency_key';
208+
if v_idempotency_key is null and (p_options->>'only_once')::boolean = true then
209+
v_idempotency_key := md5(p_task_name || '::' || v_params::text);
210+
end if;
211+
end if;
212+
213+
-- Idempotency check: return existing non-terminal task if key matches
214+
if v_idempotency_key is not null then
215+
execute format(
216+
'select t.task_id from durable.%I t
217+
where t.idempotency_key = $1
218+
and t.state not in (''completed'', ''failed'', ''cancelled'')
219+
limit 1',
220+
't_' || p_queue_name
221+
)
222+
into v_existing_task_id
223+
using v_idempotency_key;
224+
225+
if v_existing_task_id is not null then
226+
return query
227+
execute format(
228+
'select t.task_id, r.run_id, r.attempt
229+
from durable.%I t
230+
join durable.%I r on r.task_id = t.task_id and r.run_id = t.last_attempt_run
231+
where t.task_id = $1',
232+
't_' || p_queue_name,
233+
'r_' || p_queue_name
234+
)
235+
using v_existing_task_id;
236+
return;
237+
end if;
238+
end if;
239+
240+
execute format(
241+
'insert into durable.%I (task_id, task_name, params, headers, retry_strategy, max_attempts, cancellation, parent_task_id, idempotency_key, enqueue_at, first_started_at, state, attempts, last_attempt_run, completed_payload, cancelled_at)
242+
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, null, ''pending'', $11, $12, null, null)',
243+
't_' || p_queue_name
244+
)
245+
using v_task_id, p_task_name, v_params, v_headers, v_retry_strategy, v_max_attempts, v_cancellation, v_parent_task_id, v_idempotency_key, v_now, v_attempt, v_run_id;
246+
247+
execute format(
248+
'insert into durable.%I (run_id, task_id, attempt, state, available_at, wake_event, event_payload, result, failure_reason)
249+
values ($1, $2, $3, ''pending'', $4, null, null, null, null)',
250+
'r_' || p_queue_name
251+
)
252+
using v_run_id, v_task_id, v_attempt, v_now;
253+
254+
return query select v_task_id, v_run_id, v_attempt;
255+
end;
256+
$$;

src/types.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,18 @@ pub struct SpawnOptions {
137137

138138
#[serde(skip_serializing_if = "Option::is_none")]
139139
pub(crate) parent_task_id: Option<Uuid>,
140+
141+
/// When true, auto-derive an idempotency key from `hash(task_name, params)`.
142+
/// Only the first spawn creates a task; subsequent spawns with identical
143+
/// `(task_name, params)` are no-ops that return the existing task.
144+
#[serde(skip_serializing_if = "std::ops::Not::not")]
145+
pub only_once: bool,
146+
147+
/// Explicit idempotency key for deduplication.
148+
/// Use when params may differ but the operation is logically the same.
149+
/// Takes precedence over `only_once`.
150+
#[serde(skip_serializing_if = "Option::is_none")]
151+
pub idempotency_key: Option<String>,
140152
}
141153

142154
/// Options for configuring a worker.

0 commit comments

Comments
 (0)