Skip to content

Latest commit

 

History

History
243 lines (180 loc) · 6.46 KB

File metadata and controls

243 lines (180 loc) · 6.46 KB
title psycopg
icon Bug
tag Python
description Connect Python applications using psycopg to PgBeam for connection pooling, caching, and global routing. Includes SQLAlchemy integration.

Connect your Python application to PgBeam by updating the connection string. This guide covers psycopg 3, psycopg connection pools, and SQLAlchemy integration.

Setup

### Set the connection string
```bash title="Environment variable"
export DATABASE_URL=postgresql://user:pass@abc.aws.pgbeam.app:5432/mydb
```
### Connect with psycopg
```python title="main.py"
import os
import psycopg

conn = psycopg.connect(os.environ["DATABASE_URL"])

with conn.cursor() as cur:
    cur.execute("SELECT 'hello from pgbeam'")
    print(cur.fetchone())

conn.close()
```
### Set up connection pooling (recommended)
Use psycopg's built-in pool with a small pool size:

```python title="Using ConnectionPool"
from psycopg_pool import ConnectionPool
import os

pool = ConnectionPool(
    conninfo=os.environ["DATABASE_URL"],
    min_size=1,
    max_size=5,  # PgBeam handles upstream pooling — keep this low
)

with pool.connection() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT 1")
        print(cur.fetchone())
```

Pool sizing

PgBeam handles upstream connection pooling, so keep your application-side pool small:

Deployment type Recommended max_size
Single process (Gunicorn) 5-10
Multiple workers 2-3 per worker
Serverless (Lambda) 1-2
With PgBeam in transaction pool mode, each psycopg connection only holds an upstream connection during active transactions. A small pool handles high concurrency efficiently.

SQLAlchemy integration

psycopg works as the default PostgreSQL driver for SQLAlchemy. Update your engine configuration to point at PgBeam:

<Tabs items={["SQLAlchemy 2.x", "SQLAlchemy 1.x"]}> ```python title="SQLAlchemy 2.x with psycopg 3" import os import re from sqlalchemy import create_engine

# postgresql+psycopg:// uses psycopg 3 as the driver
url = re.sub(r"^postgres(ql)?://", "postgresql+psycopg://", os.environ["DATABASE_URL"])
engine = create_engine(url, pool_size=5, max_overflow=0)
```
```python title="SQLAlchemy 1.x" import os from sqlalchemy import create_engine
engine = create_engine(os.environ["DATABASE_URL"], pool_size=5, max_overflow=0)
```
Set `max_overflow=0` to prevent SQLAlchemy from creating connections beyond the pool size. PgBeam handles overflow at the proxy level.

Caching

Automatic caching via cache rules

For ORM queries (SQLAlchemy, Django ORM), PgBeam tracks the SQL shapes automatically. Enable caching for specific shapes through Cache Rules in the dashboard.

SQL annotations for fine-grained control

# Cache for 5 minutes
cur.execute("/* @pgbeam:cache maxAge=300 */ SELECT * FROM categories")

# Disable caching for a specific query
cur.execute("/* @pgbeam:cache noCache */ SELECT balance FROM accounts WHERE id = %s", [account_id])

Read replicas

Route reads to replicas with the /* @pgbeam:replica */ annotation:

# Route to a read replica
cur.execute("/* @pgbeam:replica */ SELECT * FROM products WHERE active = true")

# Combine with caching
cur.execute(
    "/* @pgbeam:replica */ /* @pgbeam:cache maxAge=600 */ SELECT * FROM categories"
)

Standard ORM queries always go to the primary. For replica routing with SQLAlchemy, use text() or raw SQL execution.

See Read Replicas for replica setup and routing details.

Error handling

psycopg maps PostgreSQL SQLSTATE codes to specific exception classes:

import psycopg

try:
    cur.execute("SELECT 1")
except psycopg.errors.TooManyConnections:
    # SQLSTATE 53300 — connection limit exceeded
    # Reduce pool size or upgrade plan
    pass
except psycopg.errors.ConfigurationLimitExceeded:
    # SQLSTATE 53400 — query rate limit exceeded
    # Enable caching or upgrade plan
    pass
except psycopg.OperationalError as e:
    if "circuit breaker" in str(e):
        # SQLSTATE 08006 — upstream unavailable
        # Retry with backoff
        pass

See Error Codes for the full reference.

Django integration

Django uses psycopg as its default PostgreSQL backend. Update DATABASES in settings.py:

import dj_database_url
import os

DATABASES = {
    "default": dj_database_url.config(
        default=os.environ["DATABASE_URL"],
        conn_max_age=600,
    )
}

# Reduce Django's connection pool
DATABASES["default"]["CONN_MAX_AGE"] = 600
DATABASES["default"]["OPTIONS"] = {
    "pool": {
        "min_size": 1,
        "max_size": 5,
    }
}

Migrations

Run migrations directly against your origin database:

# Alembic
DATABASE_URL="postgresql://user:pass@db.example.com:5432/mydb" alembic upgrade head

# Django
DATABASE_URL="postgresql://user:pass@db.example.com:5432/mydb" python manage.py migrate

Debugging

Enable debug mode to see cache and routing details:

cur.execute("SET pgbeam.debug = on")
cur.execute("SELECT * FROM users WHERE id = %s", [user_id])
# NOTICE: pgbeam: cache=hit age=12s ttl=60s swr=30s

Common issues

Issue Cause Fix
"too many connections" Pool too large Set max_size=5 in ConnectionPool
OperationalError on connect Cold start after inactivity Normal — first connection is slower
Stale data after writes Cache returning old results Adjust TTL or use noCache annotation

Further reading