Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/database-jobs/Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
EXTENSION = pgpm-database-jobs
DATA = sql/pgpm-database-jobs--0.26.0.sql
DATA = sql/pgpm-database-jobs--0.26.1.sql

PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ exports[`scheduled jobs schedule jobs 1`] = `
"attempts": 0,
"database_id": "5b720132-17d5-424d-9bcb-ee7b17c13d43",
"entity_id": null,
"entity_type": null,
"id": "1",
"is_available": true,
"key": null,
"last_error": null,
"locked_at": null,
"locked_by": null,
"max_attempts": 25,
"organization_id": null,
"payload": {
"just": "run it",
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ CREATE FUNCTION app_jobs.add_job (
run_at timestamptz DEFAULT now(),
max_attempts integer DEFAULT 25,
priority integer DEFAULT 0,
entity_id uuid DEFAULT NULL
entity_id uuid DEFAULT NULL,
organization_id uuid DEFAULT NULL,
entity_type text DEFAULT NULL
)
RETURNS app_jobs.jobs
AS $$
Expand All @@ -33,6 +35,8 @@ BEGIN
database_id,
actor_id,
entity_id,
organization_id,
entity_type,
task_identifier,
payload,
queue_name,
Expand All @@ -44,6 +48,8 @@ BEGIN
v_database_id,
v_actor_id,
add_job.entity_id,
add_job.organization_id,
add_job.entity_type,
identifier,
coalesce(payload, '{}'::json),
queue_name,
Expand Down Expand Up @@ -88,6 +94,8 @@ BEGIN
database_id,
actor_id,
entity_id,
organization_id,
entity_type,
task_identifier,
payload,
queue_name,
Expand All @@ -98,6 +106,8 @@ BEGIN
v_database_id,
v_actor_id,
add_job.entity_id,
add_job.organization_id,
add_job.entity_type,
identifier,
payload,
queue_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ CREATE TABLE app_jobs.jobs (
database_id uuid,
actor_id uuid,
entity_id uuid,
organization_id uuid,
entity_type text,
queue_name text DEFAULT NULL,
task_identifier text NOT NULL,
payload json DEFAULT '{}' ::json NOT NULL,
Expand All @@ -32,6 +34,8 @@ COMMENT ON COLUMN app_jobs.jobs.id IS 'Auto-incrementing job identifier';
COMMENT ON COLUMN app_jobs.jobs.database_id IS 'Database this job belongs to (nullable for system-level jobs without tenant context)';
COMMENT ON COLUMN app_jobs.jobs.actor_id IS 'User who triggered this job, read from JWT claims at enqueue time';
COMMENT ON COLUMN app_jobs.jobs.entity_id IS 'Entity (org/team) this job is scoped to for billing; NULL means platform-level (resolved via database_id → owner_id)';
COMMENT ON COLUMN app_jobs.jobs.organization_id IS 'Top-level organization for this entity; resolved at enqueue time via get_organization_id(entity_type, entity_id)';
COMMENT ON COLUMN app_jobs.jobs.entity_type IS 'Entity type prefix (org, team, app, etc.) for interpreting entity_id';
COMMENT ON COLUMN app_jobs.jobs.queue_name IS 'Name of the queue this job belongs to; used for worker routing and concurrency control';
COMMENT ON COLUMN app_jobs.jobs.task_identifier IS 'Identifier for the task type (maps to a worker handler function)';
COMMENT ON COLUMN app_jobs.jobs.payload IS 'JSON payload of arguments passed to the task handler';
Expand Down
2 changes: 1 addition & 1 deletion packages/database-jobs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@
"bugs": {
"url": "https://github.com/constructive-io/pgpm-modules/issues"
}
}
}
2 changes: 1 addition & 1 deletion packages/database-jobs/pgpm-database-jobs.control
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# pgpm-database-jobs extension
comment = 'pgpm-database-jobs extension'
default_version = '0.26.0'
default_version = '0.26.1'
module_pathname = '$libdir/pgpm-database-jobs'
requires = 'plpgsql,pgcrypto,pgpm-verify,pgpm-jwt-claims'
relocatable = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ $EOFCODE$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

COMMENT ON FUNCTION app_jobs.tg_add_job_with_row IS 'Useful shortcut to create a job on insert or update. Pass the task name as the trigger argument, and the record data will automatically be available on the JSON payload.';

CREATE FUNCTION app_jobs.json_build_object_apply(
arguments text[]
) RETURNS pg_catalog.json AS $EOFCODE$
CREATE FUNCTION app_jobs.json_build_object_apply(arguments text[]) RETURNS pg_catalog.json AS $EOFCODE$
DECLARE
arg text;
_sql text;
Expand Down Expand Up @@ -195,6 +193,8 @@ CREATE TABLE app_jobs.jobs (
database_id uuid,
actor_id uuid,
entity_id uuid,
organization_id uuid,
entity_type text,
queue_name text DEFAULT NULL,
task_identifier text NOT NULL,
payload pg_catalog.json DEFAULT '{}'::json NOT NULL,
Expand Down Expand Up @@ -226,6 +226,10 @@ COMMENT ON COLUMN app_jobs.jobs.actor_id IS 'User who triggered this job, read f

COMMENT ON COLUMN app_jobs.jobs.entity_id IS 'Entity (org/team) this job is scoped to for billing; NULL means platform-level (resolved via database_id → owner_id)';

COMMENT ON COLUMN app_jobs.jobs.organization_id IS 'Top-level organization for this entity; resolved at enqueue time via get_organization_id(entity_type, entity_id)';

COMMENT ON COLUMN app_jobs.jobs.entity_type IS 'Entity type prefix (org, team, app, etc.) for interpreting entity_id';

COMMENT ON COLUMN app_jobs.jobs.queue_name IS 'Name of the queue this job belongs to; used for worker routing and concurrency control';

COMMENT ON COLUMN app_jobs.jobs.task_identifier IS 'Identifier for the task type (maps to a worker handler function)';
Expand Down Expand Up @@ -374,10 +378,7 @@ CREATE INDEX job_queues_locked_by_idx ON app_jobs.job_queues (locked_by);

GRANT SELECT, INSERT, UPDATE, DELETE ON app_jobs.job_queues TO administrator;

CREATE FUNCTION app_jobs.run_scheduled_job(
id bigint,
job_expiry interval DEFAULT '1 hours'
) RETURNS app_jobs.jobs AS $EOFCODE$
CREATE FUNCTION app_jobs.run_scheduled_job(id bigint, job_expiry interval DEFAULT '1 hours') RETURNS app_jobs.jobs AS $EOFCODE$
DECLARE
j app_jobs.jobs;
last_id bigint;
Expand Down Expand Up @@ -448,13 +449,7 @@ BEGIN
END;
$EOFCODE$ LANGUAGE plpgsql VOLATILE;

CREATE FUNCTION app_jobs.reschedule_jobs(
job_ids bigint[],
run_at timestamptz DEFAULT NULL,
priority int DEFAULT NULL,
attempts int DEFAULT NULL,
max_attempts int DEFAULT NULL
) RETURNS SETOF app_jobs.jobs LANGUAGE sql AS $EOFCODE$
CREATE FUNCTION app_jobs.reschedule_jobs(job_ids bigint[], run_at timestamptz DEFAULT NULL, priority int DEFAULT NULL, attempts int DEFAULT NULL, max_attempts int DEFAULT NULL) RETURNS SETOF app_jobs.jobs LANGUAGE sql AS $EOFCODE$
UPDATE
app_jobs.jobs
SET
Expand All @@ -470,10 +465,7 @@ CREATE FUNCTION app_jobs.reschedule_jobs(
*;
$EOFCODE$;

CREATE FUNCTION app_jobs.release_scheduled_jobs(
worker_id text,
ids bigint[] DEFAULT NULL
) RETURNS void AS $EOFCODE$
CREATE FUNCTION app_jobs.release_scheduled_jobs(worker_id text, ids bigint[] DEFAULT NULL) RETURNS void AS $EOFCODE$
DECLARE
BEGIN
-- clear the scheduled job
Expand All @@ -489,9 +481,7 @@ BEGIN
END;
$EOFCODE$ LANGUAGE plpgsql VOLATILE;

CREATE FUNCTION app_jobs.release_jobs(
worker_id text
) RETURNS void AS $EOFCODE$
CREATE FUNCTION app_jobs.release_jobs(worker_id text) RETURNS void AS $EOFCODE$
DECLARE
BEGIN
-- clear the job
Expand All @@ -514,10 +504,7 @@ BEGIN
END;
$EOFCODE$ LANGUAGE plpgsql VOLATILE;

CREATE FUNCTION app_jobs.permanently_fail_jobs(
job_ids bigint[],
error_message text DEFAULT NULL
) RETURNS SETOF app_jobs.jobs LANGUAGE sql AS $EOFCODE$
CREATE FUNCTION app_jobs.permanently_fail_jobs(job_ids bigint[], error_message text DEFAULT NULL) RETURNS SETOF app_jobs.jobs LANGUAGE sql AS $EOFCODE$
UPDATE
app_jobs.jobs
SET
Expand All @@ -531,10 +518,7 @@ CREATE FUNCTION app_jobs.permanently_fail_jobs(
*;
$EOFCODE$;

CREATE FUNCTION app_jobs.get_scheduled_job(
worker_id text,
task_identifiers text[] DEFAULT NULL
) RETURNS app_jobs.scheduled_jobs LANGUAGE plpgsql AS $EOFCODE$
CREATE FUNCTION app_jobs.get_scheduled_job(worker_id text, task_identifiers text[] DEFAULT NULL) RETURNS app_jobs.scheduled_jobs LANGUAGE plpgsql AS $EOFCODE$
DECLARE
v_job_id bigint;
v_row app_jobs.scheduled_jobs;
Expand Down Expand Up @@ -586,11 +570,7 @@ BEGIN
END;
$EOFCODE$;

CREATE FUNCTION app_jobs.get_job(
worker_id text,
task_identifiers text[] DEFAULT NULL,
job_expiry interval DEFAULT '4 hours'
) RETURNS app_jobs.jobs LANGUAGE plpgsql AS $EOFCODE$
CREATE FUNCTION app_jobs.get_job(worker_id text, task_identifiers text[] DEFAULT NULL, job_expiry interval DEFAULT '4 hours') RETURNS app_jobs.jobs LANGUAGE plpgsql AS $EOFCODE$
DECLARE
v_job_id bigint;
v_queue_name text;
Expand Down Expand Up @@ -645,11 +625,7 @@ BEGIN
END;
$EOFCODE$;

CREATE FUNCTION app_jobs.fail_job(
worker_id text,
job_id bigint,
error_message text
) RETURNS app_jobs.jobs LANGUAGE plpgsql STRICT AS $EOFCODE$
CREATE FUNCTION app_jobs.fail_job(worker_id text, job_id bigint, error_message text) RETURNS app_jobs.jobs LANGUAGE plpgsql STRICT AS $EOFCODE$
DECLARE
v_row app_jobs.jobs;
BEGIN
Expand Down Expand Up @@ -679,9 +655,7 @@ BEGIN
END;
$EOFCODE$;

CREATE FUNCTION app_jobs.complete_jobs(
job_ids bigint[]
) RETURNS SETOF app_jobs.jobs LANGUAGE sql AS $EOFCODE$
CREATE FUNCTION app_jobs.complete_jobs(job_ids bigint[]) RETURNS SETOF app_jobs.jobs LANGUAGE sql AS $EOFCODE$
DELETE FROM app_jobs.jobs
WHERE id = ANY (job_ids)
AND (locked_by IS NULL
Expand All @@ -690,10 +664,7 @@ CREATE FUNCTION app_jobs.complete_jobs(
*;
$EOFCODE$;

CREATE FUNCTION app_jobs.complete_job(
worker_id text,
job_id bigint
) RETURNS app_jobs.jobs LANGUAGE plpgsql AS $EOFCODE$
CREATE FUNCTION app_jobs.complete_job(worker_id text, job_id bigint) RETURNS app_jobs.jobs LANGUAGE plpgsql AS $EOFCODE$
DECLARE
v_row app_jobs.jobs;
BEGIN
Expand All @@ -715,16 +686,7 @@ BEGIN
END;
$EOFCODE$;

CREATE FUNCTION app_jobs.add_scheduled_job(
identifier text,
payload pg_catalog.json DEFAULT '{}'::json,
schedule_info pg_catalog.json DEFAULT '{}'::json,
job_key text DEFAULT NULL,
queue_name text DEFAULT NULL,
max_attempts int DEFAULT 25,
priority int DEFAULT 0,
entity_id uuid DEFAULT NULL
) RETURNS app_jobs.scheduled_jobs AS $EOFCODE$
CREATE FUNCTION app_jobs.add_scheduled_job(identifier text, payload pg_catalog.json DEFAULT '{}'::json, schedule_info pg_catalog.json DEFAULT '{}'::json, job_key text DEFAULT NULL, queue_name text DEFAULT NULL, max_attempts int DEFAULT 25, priority int DEFAULT 0, entity_id uuid DEFAULT NULL) RETURNS app_jobs.scheduled_jobs AS $EOFCODE$
DECLARE
v_job app_jobs.scheduled_jobs;
v_database_id uuid;
Expand Down Expand Up @@ -812,16 +774,7 @@ BEGIN
END;
$EOFCODE$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

CREATE FUNCTION app_jobs.add_job(
identifier text,
payload pg_catalog.json DEFAULT '{}'::json,
job_key text DEFAULT NULL,
queue_name text DEFAULT NULL,
run_at timestamptz DEFAULT now(),
max_attempts int DEFAULT 25,
priority int DEFAULT 0,
entity_id uuid DEFAULT NULL
) RETURNS app_jobs.jobs AS $EOFCODE$
CREATE FUNCTION app_jobs.add_job(identifier text, payload pg_catalog.json DEFAULT '{}'::json, job_key text DEFAULT NULL, queue_name text DEFAULT NULL, run_at timestamptz DEFAULT now(), max_attempts int DEFAULT 25, priority int DEFAULT 0, entity_id uuid DEFAULT NULL, organization_id uuid DEFAULT NULL, entity_type text DEFAULT NULL) RETURNS app_jobs.jobs AS $EOFCODE$
DECLARE
v_job app_jobs.jobs;
v_database_id uuid;
Expand All @@ -837,6 +790,8 @@ BEGIN
database_id,
actor_id,
entity_id,
organization_id,
entity_type,
task_identifier,
payload,
queue_name,
Expand All @@ -848,6 +803,8 @@ BEGIN
v_database_id,
v_actor_id,
add_job.entity_id,
add_job.organization_id,
add_job.entity_type,
identifier,
coalesce(payload, '{}'::json),
queue_name,
Expand Down Expand Up @@ -892,6 +849,8 @@ BEGIN
database_id,
actor_id,
entity_id,
organization_id,
entity_type,
task_identifier,
payload,
queue_name,
Expand All @@ -902,6 +861,8 @@ BEGIN
v_database_id,
v_actor_id,
add_job.entity_id,
add_job.organization_id,
add_job.entity_type,
identifier,
payload,
queue_name,
Expand All @@ -915,9 +876,7 @@ BEGIN
END;
$EOFCODE$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

CREATE FUNCTION app_jobs.remove_job(
job_key text
) RETURNS app_jobs.jobs LANGUAGE plpgsql STRICT AS $EOFCODE$
CREATE FUNCTION app_jobs.remove_job(job_key text) RETURNS app_jobs.jobs LANGUAGE plpgsql STRICT AS $EOFCODE$
DECLARE
v_job app_jobs.jobs;
BEGIN
Expand All @@ -942,9 +901,7 @@ BEGIN
END;
$EOFCODE$;

CREATE FUNCTION app_jobs.force_unlock_workers(
worker_ids text[]
) RETURNS void LANGUAGE sql VOLATILE AS $EOFCODE$
CREATE FUNCTION app_jobs.force_unlock_workers(worker_ids text[]) RETURNS void LANGUAGE sql VOLATILE AS $EOFCODE$
UPDATE app_jobs.jobs
SET locked_at = NULL, locked_by = NULL
WHERE locked_by = ANY (worker_ids);
Expand Down
2 changes: 1 addition & 1 deletion packages/metaschema-modules/Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
EXTENSION = metaschema-modules
DATA = sql/metaschema-modules--0.26.0.sql
DATA = sql/metaschema-modules--0.26.1.sql

PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ exports[`db_meta_modules should verify module table structures have database_id

exports[`db_meta_modules should verify module tables have proper foreign key relationships 1`] = `
{
"constraintCount": 394604,
"constraintCount": 396560,
"foreignTables": [
"database",
"field",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ CREATE TABLE metaschema_modules_public.agent_module (
-- Multi-tenant scope
membership_type int DEFAULT NULL,

-- Module key discriminator: allows multiple agent modules per scope.
-- 'default' is omitted from table names, any other value becomes
-- an infix: {prefix}_{key}_agent_thread.
-- Max 16 chars, lowercase snake_case.
key text NOT NULL DEFAULT 'default',

-- Entity table for RLS (NULL for app-level, entity table for entity-scoped)
entity_table_id uuid NULL,

Expand Down Expand Up @@ -69,9 +75,9 @@ CREATE TABLE metaschema_modules_public.agent_module (

CREATE INDEX agent_module_database_id_idx ON metaschema_modules_public.agent_module ( database_id );

-- Unique constraint on (database_id, membership_type) using COALESCE to handle NULLs.
-- NULL membership_type = app-level, non-NULL = entity-scoped.
-- Only one agent module per scope.
CREATE UNIQUE INDEX agent_module_unique_scope ON metaschema_modules_public.agent_module ( database_id, COALESCE(membership_type, -1) );
-- Unique constraint on (database_id, membership_type, key) using COALESCE to handle NULLs.
-- NULL membership_type = app-level, non-NULL = entity-scoped. key discriminates
-- multiple agent modules for the same scope (e.g. 'support' + 'internal').
CREATE UNIQUE INDEX agent_module_unique_scope ON metaschema_modules_public.agent_module ( database_id, COALESCE(membership_type, -1), key );

COMMIT;
Loading
Loading