|
| 1 | +# Symfony PostgreSQL Messenger Bridge |
| 2 | + |
| 3 | +A Symfony Messenger transport backed by Flow PHP's native PostgreSQL library. Replaces `symfony/doctrine-messenger` without requiring Doctrine DBAL — messages are stored directly in PostgreSQL using Flow's query builder and client. |
| 4 | + |
| 5 | +- [Back](/documentation/introduction.md) |
| 6 | +- [Packagist](https://packagist.org/packages/flow-php/symfony-postgresql-messenger-bridge) |
| 7 | +- [GitHub](https://github.com/flow-php/symfony-postgresql-messenger-bridge) |
| 8 | +- [API Reference](/documentation/api/bridge/symfony-postgresql-messenger) |
| 9 | + |
| 10 | +[TOC] |
| 11 | + |
| 12 | +## Installation |
| 13 | + |
| 14 | +```bash |
| 15 | +composer require flow-php/symfony-postgresql-messenger-bridge:~--FLOW_PHP_VERSION-- |
| 16 | +``` |
| 17 | + |
| 18 | +This package is used together with the [Symfony PostgreSQL Bundle](/documentation/components/bridges/symfony-postgresql-bundle.md), which registers the transport factory and catalog provider automatically. |
| 19 | + |
| 20 | +## How It Works |
| 21 | + |
| 22 | +The bridge provides a Symfony Messenger transport that stores messages in a PostgreSQL table using Flow's native `Client`. No Doctrine DBAL is involved. |
| 23 | + |
| 24 | +- **Sending** inserts a row with `body`, `headers`, `queue_name`, `created_at`, and `available_at` |
| 25 | +- **Receiving** uses `SELECT ... FOR UPDATE SKIP LOCKED` inside a transaction for safe concurrent consumption |
| 26 | +- **Delayed messages** offset `available_at` by the `DelayStamp` value (in milliseconds) |
| 27 | +- **Redelivery** of timed-out messages is handled automatically based on `redeliver_timeout` |
| 28 | +- **Keepalive** refreshes `delivered_at` to prevent redelivery during long-running handlers |
| 29 | + |
| 30 | +## Configuration |
| 31 | + |
| 32 | +### 1. Enable Messenger |
| 33 | + |
| 34 | +In your `flow_postgresql` configuration, enable messenger at the root level: |
| 35 | + |
| 36 | +```yaml |
| 37 | +# config/packages/flow_postgresql.yaml |
| 38 | +flow_postgresql: |
| 39 | + connections: |
| 40 | + default: |
| 41 | + dsn: '%env(DATABASE_URL)%' |
| 42 | + |
| 43 | + messenger: |
| 44 | + enabled: true |
| 45 | + table_name: messenger_messages # default |
| 46 | + schema: public # default |
| 47 | +``` |
| 48 | +
|
| 49 | +This registers a `MessengerCatalogProvider` and a transport factory that can resolve any configured connection by name. The `messenger_messages` table will be included in schema diffs and migrations automatically. |
| 50 | + |
| 51 | +### 2. Configure the Messenger Transport |
| 52 | + |
| 53 | +```yaml |
| 54 | +# config/packages/messenger.yaml |
| 55 | +framework: |
| 56 | + messenger: |
| 57 | + transports: |
| 58 | + async: |
| 59 | + dsn: 'flow-pgsql://default' |
| 60 | + options: |
| 61 | + queue_name: default # default |
| 62 | + redeliver_timeout: 3600 # seconds, default |
| 63 | + routing: |
| 64 | + App\Message\MyMessage: async |
| 65 | +``` |
| 66 | + |
| 67 | +The DSN format is `flow-pgsql://<connection_name>` where `<connection_name>` matches a connection defined under `flow_postgresql.connections`. Both `flow-pgsql://` and `flow-postgresql://` schemes are supported. |
| 68 | + |
| 69 | +### 3. Create the Table via Migrations |
| 70 | + |
| 71 | +Because the catalog provider is registered automatically, the messenger table appears in migration diffs: |
| 72 | + |
| 73 | +```bash |
| 74 | +php bin/console flow:migrations:diff |
| 75 | +php bin/console flow:migrations:migrate |
| 76 | +``` |
| 77 | + |
| 78 | +This is the recommended approach. The bridge does **not** implement `SetupableTransportInterface` — there is no `messenger:setup-transports` support. Use migrations for schema management. |
| 79 | + |
| 80 | +## DSN Options |
| 81 | + |
| 82 | +Options can be set in the DSN query string or in the `options` array under the transport configuration. The `options` array takes precedence over DSN query parameters. |
| 83 | + |
| 84 | +| Option | Default | Description | |
| 85 | +|--------|---------|-------------| |
| 86 | +| `queue_name` | `default` | Queue name for message routing | |
| 87 | +| `table_name` | `messenger_messages` | Table storing messages (must match `flow_postgresql.messenger` config) | |
| 88 | +| `schema` | `public` | Schema owning the table (must match `flow_postgresql.messenger` config) | |
| 89 | +| `redeliver_timeout` | `3600` | Seconds before a delivered-but-unacknowledged message becomes eligible for redelivery | |
| 90 | + |
| 91 | +Example with DSN options: |
| 92 | + |
| 93 | +```yaml |
| 94 | +framework: |
| 95 | + messenger: |
| 96 | + transports: |
| 97 | + high_priority: |
| 98 | + dsn: 'flow-pgsql://default?queue_name=high&redeliver_timeout=1800' |
| 99 | +``` |
| 100 | + |
| 101 | +## Multiple Queues |
| 102 | + |
| 103 | +Multiple transports can share the same table but use different queue names: |
| 104 | + |
| 105 | +```yaml |
| 106 | +framework: |
| 107 | + messenger: |
| 108 | + transports: |
| 109 | + async: |
| 110 | + dsn: 'flow-pgsql://default' |
| 111 | + options: |
| 112 | + queue_name: default |
| 113 | + high_priority: |
| 114 | + dsn: 'flow-pgsql://default' |
| 115 | + options: |
| 116 | + queue_name: high |
| 117 | + routing: |
| 118 | + App\Message\ImportantMessage: high_priority |
| 119 | + App\Message\BackgroundTask: async |
| 120 | +``` |
| 121 | + |
| 122 | +## Table Schema |
| 123 | + |
| 124 | +The messenger table has the following structure: |
| 125 | + |
| 126 | +| Column | Type | Description | |
| 127 | +|--------|------|-------------| |
| 128 | +| `id` | `bigint GENERATED ALWAYS AS IDENTITY` | Primary key | |
| 129 | +| `body` | `text` | Serialized message body | |
| 130 | +| `headers` | `text` | JSON-encoded message headers (stamps, type info) | |
| 131 | +| `queue_name` | `varchar(190)` | Queue name for message routing | |
| 132 | +| `created_at` | `timestamptz` | When the message was dispatched | |
| 133 | +| `available_at` | `timestamptz` | When the message becomes available for consumption | |
| 134 | +| `delivered_at` | `timestamptz` | When a worker picked up the message (null = not yet delivered) | |
| 135 | + |
| 136 | +Three indexes are created: on `queue_name`, `available_at`, and `delivered_at`. |
0 commit comments