postgres-best-practices
PostgreSQL Best Practices (psycopg, no ORM)
Core rules for every piece of database code in this project:
- No ORM — use psycopg directly.
- Inline SQL — trivial queries of 4 lines or fewer may be written inline in Python. Anything with JOINs, subqueries, CTEs, aggregations, or multiple conditions must live in its own
.sqlfile. - Named parameters — always
%(name)sstyle, never positional%s. - SQL formatting — all
.sqlfiles are formatted with shandy-sqlfmt. - Dynamic SQL — check
pyproject.tomlfor the Python version; use psycopg t-string templates on 3.14+, otherwisepsycopg.sql. - Result mapping — every query result maps to a Pydantic model defined in a
{topic}_models.pyfile.
Project layout
app/
├── db/
│ ├── connection.py # pool factory + get_pg_connection()
│ ├── postgres.py # generic pg_* CRUD helpers
│ ├── loader.py # load_sql() helper
│ ├── queries/
│ │ ├── users/
│ │ │ ├── get_user_by_id.sql
│ │ │ └── list_active_users.sql
│ │ └── orders/
│ │ └── list_orders_by_user.sql
│ └── repositories/
│ ├── user_repository.py
│ └── order_repository.py
├── models/
│ ├── user_models.py # Pydantic models for user domain
│ └── order_models.py # Pydantic models for order domain
SQL files live under db/queries/<topic>/. Every custom query gets its own file — no multi-statement files that lump unrelated queries together.
Inline SQL example (acceptable — simple, ≤ 4 lines):
await cur.execute("SELECT id, name FROM users WHERE id = %(id)s", {"id": user_id})
Not acceptable inline (use a .sql file):
# Too complex — JOIN + condition + ORDER BY
await cur.execute("""
SELECT u.id, u.name, o.total
FROM users u
JOIN orders o ON o.user_id = u.id
WHERE u.active = true
ORDER BY o.created_at DESC
""")
Connection & pool
The env-var prefix (POSTGRES_) should be adjusted to match the project (e.g. APP_POSTGRES_, MDM_POSTGRES_).
_pool is intentional module-level mutable state
# db/connection.py
from __future__ import annotations
import os
from psycopg_pool import AsyncConnectionPool
_PREFIX = os.getenv("PG_ENV_PREFIX", "POSTGRES_")
def _dsn() -> str:
p = _PREFIX
return (
f"host={os.environ[p + 'HOST']} "
f"port={os.environ[p + 'PORT']} "
f"dbname={os.environ[p + 'DB']} "
f"user={os.environ[p + 'USER']} "
f"password={os.environ[p + 'PASSWORD']}"
)
_pool: AsyncConnectionPool | None = None
async def open_pool() -> None:
global _pool
if _pool is None:
_pool = AsyncConnectionPool(conninfo=_dsn, open=False, max_size=40, check=AsyncConnectionPool.check_connection)
if not _pool._opened:
await _pool.open()
def get_pg_connection():
"""Return an async connection context manager from the pool."""
assert _pool is not None, "Call open_pool() first (e.g. in app startup)."
return _pool.connection()
Base model — PostgresTableModel
Models that map 1:1 to a DB table extend PostgresTableModel. This lets the generic helpers below know which table and which columns are PKs:
# db/pg_base.py
from abc import ABC
from typing import Sequence
from pydantic import BaseModel
class PostgresTableModel(BaseModel, ABC):
@staticmethod
def get_table_name() -> tuple[str, str]:
"""Return (schema, table) e.g. ('public', 'users')."""
...
@staticmethod
def get_primary_key() -> Sequence[str]:
"""Return the PK column name(s) as a sequence."""
...
Topic models that represent a full table row inherit from PostgresTableModel:
# models/user_models.py
from db.pg_base import PostgresTableModel
class UserRow(PostgresTableModel):
id: int
email: str
display_name: str
created_at: datetime
@staticmethod
def get_table_name() -> tuple[str, str]:
return ("public", "users")
@staticmethod
def get_primary_key() -> Sequence[str]:
return ["id"]
Models that represent partial results (e.g. from a JOIN) just extend BaseModel directly — they are not table models.
Generic CRUD helpers — db/postgres.py
Copy this file into the project and strip any project-specific bits. These helpers cover the common CRUD patterns so you don't have to write boilerplate SQL for simple operations:
# db/postgres.py
from __future__ import annotations
from typing import Any, Sequence, TypeVar, Type, Optional, Callable, Mapping
from psycopg.connection_async import AsyncConnection
from psycopg.sql import SQL, Identifier, Placeholder
from psycopg.rows import dict_row
from db.pg_base import PostgresTableModel
T = TypeVar("T", bound=PostgresTableModel)
async def pg_retrieve(con: AsyncConnection, data_type: Type[T], pks: dict) -> T | None:
"""Fetch a single row by primary key(s)."""
async with con.cursor(row_factory=dict_row) as cur:
schema, table = data_type.get_table_name()
where = " AND ".join(f"{pk} = %({pk})s" for pk in pks)
await cur.execute(f"select * from {schema}.{table} where {where}", pks) # type: ignore[arg-type]
row = await cur.fetchone()
return data_type(**row) if row else None
async def pg_retrieve_many(
con: AsyncConnection,
data_type: Type[T],
filters: dict,
*,
from_dict: Optional[Callable[[Mapping], T]] = None,
) -> Sequence[T]:
"""Fetch multiple rows matching all filter key=value pairs."""
async with con.cursor(row_factory=dict_row) as cur:
schema, table = data_type.get_table_name()
if filters:
where = " AND ".join(f"{k} = %({k})s" for k in filters)
sql = f"select * from {schema}.{table} where {where}" # type: ignore[assignment]
else:
sql = f"select * from {schema}.{table}" # type: ignore[assignment]
await cur.execute(sql, filters)
rows = await cur.fetchall()
fn = from_dict or (lambda d: data_type(**d))
return [fn(r) for r in rows]
async def pg_insert(con: AsyncConnection, table_name: tuple[str, str], data: dict) -> dict[str, Any]:
"""Insert one row and return the full row (RETURNING *)."""
query = SQL("INSERT INTO {tbl} ({cols}) VALUES ({vals}) RETURNING *").format(
tbl=Identifier(*table_name),
cols=SQL(", ").join(Identifier(k) for k in data),
vals=SQL(", ").join(Placeholder(k) for k in data),
)
async with con.cursor(row_factory=dict_row) as cur:
await cur.execute(query, data)
row = await cur.fetchone()
assert row is not None
return row
async def pg_update_dict(
con: AsyncConnection,
table_name: tuple[str, str],
data: dict,
primary_keys: Sequence[str],
) -> Any | None:
"""Update a row identified by primary_keys. Returns the raw row tuple."""
set_parts = [
SQL("{col} = {val}").format(col=Identifier(k), val=Placeholder(k))
for k in data if k not in primary_keys
]
where_parts = [
SQL("{col} = {val}").format(col=Identifier(pk), val=Placeholder(pk))
for pk in primary_keys
]
query = SQL("UPDATE {tbl} SET {sets} WHERE {where} RETURNING *").format(
tbl=Identifier(*table_name),
sets=SQL(", ").join(set_parts),
where=SQL(" AND ").join(where_parts),
)
async with con.cursor() as cur:
await cur.execute(query, data)
return await cur.fetchone()
async def pg_update(con: AsyncConnection, data: T, data_type: type[T]) -> Any | None:
"""Update a typed model instance."""
return await pg_update_dict(con, data_type.get_table_name(), data.model_dump(), data_type.get_primary_key())
async def pg_upsert_dict(
con: AsyncConnection,
table_name: tuple[str, str],
data: dict,
primary_keys: Sequence[str],
) -> dict:
"""INSERT … ON CONFLICT … DO UPDATE, returns the row as a dict."""
fields = list(data)
updates = [SQL("{col} = EXCLUDED.{col}").format(col=Identifier(k)) for k in fields]
query = SQL(
"INSERT INTO {tbl} ({cols}) VALUES ({vals}) ON CONFLICT ({pks}) DO UPDATE SET {updates} RETURNING *"
).format(
tbl=Identifier(*table_name),
cols=SQL(", ").join(Identifier(k) for k in fields),
vals=SQL(", ").join(Placeholder(k) for k in fields),
pks=SQL(", ").join(Identifier(pk) for pk in primary_keys),
updates=SQL(", ").join(updates),
)
async with con.cursor(row_factory=dict_row) as cur:
await cur.execute(query, data)
row = await cur.fetchone()
assert row is not None
return row
async def pg_upsert(con: AsyncConnection, data: T, data_type: type[T]):
"""Upsert a typed model instance."""
return await pg_upsert_dict(con, data_type.get_table_name(), data.model_dump(), data_type.get_primary_key())
async def pg_upsert_many_dict(
con: AsyncConnection,
table_name: tuple[str, str],
data: Sequence[dict],
primary_keys: Sequence[str],
) -> None:
"""Batch upsert — one round-trip via executemany."""
if not data:
return
fields = list(data[0])
updates = [SQL("{col} = EXCLUDED.{col}").format(col=Identifier(k)) for k in fields if k not in primary_keys]
query = SQL(
"INSERT INTO {tbl} ({cols}) VALUES ({vals}) ON CONFLICT ({pks}) DO UPDATE SET {updates}"
).format(
tbl=Identifier(*table_name),
cols=SQL(", ").join(Identifier(k) for k in fields),
vals=SQL(", ").join(Placeholder(k) for k in fields),
pks=SQL(", ").join(Identifier(pk) for pk in primary_keys),
updates=SQL(", ").join(updates),
)
async with con.cursor() as cur:
await cur.executemany(query, data)
async def pg_upsert_many(con: AsyncConnection, data: Sequence[T], data_type: type[T]) -> None:
await pg_upsert_many_dict(con, data_type.get_table_name(), [d.model_dump() for d in data], data_type.get_primary_key())
async def pg_insert_many(
con: AsyncConnection,
table_name: tuple[str, str],
data: Sequence[dict],
) -> None:
"""Batch insert — no RETURNING, one round-trip via executemany."""
if not data:
return
fields = list(data[0])
query = SQL("INSERT INTO {tbl} ({cols}) VALUES ({vals})").format(
tbl=Identifier(*table_name),
cols=SQL(", ").join(Identifier(k) for k in fields),
vals=SQL(", ").join(Placeholder(k) for k in fields),
)
async with con.cursor() as cur:
await cur.executemany(query, data)
async def pg_delete_dict(con: AsyncConnection, table_name: tuple[str, str], data: dict) -> dict | None:
"""Delete by arbitrary key dict, returns the deleted row."""
where_parts = [SQL("{col} = {val}").format(col=Identifier(k), val=Placeholder(k)) for k in data]
query = SQL("DELETE FROM {tbl} WHERE {where} RETURNING *").format(
tbl=Identifier(*table_name),
where=SQL(" AND ").join(where_parts),
)
async with con.cursor() as cur:
await cur.execute(query, data)
row = await cur.fetchone()
if row is None:
return None
assert cur.description is not None
return dict(zip([c[0] for c in cur.description], row))
async def pg_delete(con: AsyncConnection, data: T, data_type: type[T]) -> T | None:
"""Delete a typed model instance by its primary key(s)."""
pk_dict = {pk: getattr(data, pk) for pk in data_type.get_primary_key()}
row = await pg_delete_dict(con, data_type.get_table_name(), pk_dict)
return data_type.model_validate(row) if row else None
Use these for simple CRUD. For anything that needs a custom WHERE clause, a join, aggregation, or ordering — write a dedicated .sql file and a repository method.
Loading SQL files
# db/loader.py
from functools import lru_cache
from pathlib import Path
_QUERIES_ROOT = Path(__file__).parent / "queries"
@lru_cache(maxsize=None)
def load_sql(topic: str, name: str) -> str:
"""Return the SQL text for queries/<topic>/<name>.sql."""
return (_QUERIES_ROOT / topic / f"{name}.sql").read_text()
Each file is read from disk once per process lifetime. Never embed SQL strings in Python.
Named parameters — %(name)s style
psycopg named-parameter style with a dict argument:
-- db/queries/users/list_active_users.sql
select
usr.id,
usr.email,
usr.display_name,
usr.created_at,
from users as usr
where usr.is_active = true
order by usr.created_at desc
limit %(limit)s
await cur.execute(load_sql("users", "list_active_users"), {"limit": 50})
Never mix positional %s and named %(name)s. Never build SQL with f-strings or str.format().
Pydantic models — {topic}_models.py
Query results that aren't full-table rows (joins, aggregations, partial selects) get their own plain BaseModel:
# models/user_models.py
from __future__ import annotations
from datetime import datetime
from pydantic import BaseModel, ConfigDict
from db.pg_base import PostgresTableModel
class UserRow(PostgresTableModel):
"""Full users table row — used with pg_* helpers."""
model_config = ConfigDict(from_attributes=True)
id: int
email: str
display_name: str
created_at: datetime
@staticmethod
def get_table_name() -> tuple[str, str]:
return ("public", "users")
@staticmethod
def get_primary_key() -> Sequence[str]:
return ["id"]
class UserSummary(BaseModel):
"""Partial result — doesn't map to a single table."""
model_config = ConfigDict(from_attributes=True)
id: int
display_name: str
Repository pattern
Use pg_* helpers for simple CRUD. Write custom SQL + a repository method for anything else:
# db/repositories/user_repository.py
from __future__ import annotations
from psycopg.rows import dict_row
from db.loader import load_sql
from db.connection import get_pg_connection
from db.postgres import pg_retrieve, pg_insert, pg_delete
from models.user_models import UserRow, UserSummary
class UserRepository:
async def get_by_id(self, user_id: int) -> UserRow | None:
async with get_pg_connection() as conn:
return await pg_retrieve(conn, UserRow, {"id": user_id})
async def list_active(self, limit: int = 100) -> list[UserSummary]:
sql = load_sql("users", "list_active_users")
async with get_pg_connection() as conn:
async with conn.cursor(row_factory=dict_row) as cur:
await cur.execute(sql, {"limit": limit})
rows = await cur.fetchall()
return [UserSummary.model_validate(r) for r in rows]
async def delete(self, user: UserRow) -> UserRow | None:
async with get_pg_connection() as conn:
return await pg_delete(conn, user, UserRow)
Dynamic SQL
Avoid dynamic SQL whenever possible — a static .sql file is always clearer. When column names or table names genuinely vary at runtime, check pyproject.toml (or the project's [project] requires-python) to determine which approach to use — decide at authoring time, not with a runtime sys.version_info check:
Python 3.14+ — t-string templates
T-strings look like f-strings but are evaluated by psycopg, not Python — values are always sent as bound parameters, never interpolated.
Basic parameter (default s specifier):
await cur.execute(t"SELECT * FROM users WHERE id = {user_id}")
Format specifiers:
| Specifier | Meaning |
|---|---|
{val} / {val:s} |
Bound parameter, automatic format (default) |
{val:b} |
Bound parameter, binary format |
{val:t} |
Bound parameter, text format |
{name:i} |
SQL identifier (table/column name) — double-quoted |
{val:l} |
Literal value merged client-side (use sparingly) |
{snippet:q} |
SQL snippet — another t-string or sql.SQL/Composed instance |
Dynamic identifier (:i):
column = "email" # comes from caller
query = t"SELECT {column:i} FROM users WHERE active = {active}"
await cur.execute(query)
NOTIFY — requires client-side composition (:i and :l):
def send_notify(conn: Connection, channel: str, payload: str) -> None:
conn.execute(t"NOTIFY {channel:i}, {payload:l}")
# NOTIFY "foo.bar", 'O''Reilly'
Nested templates with :q (dynamic WHERE clause):
from psycopg import sql
def search_users(
conn: Connection,
ids: Sequence[int] | None = None,
name_pattern: str | None = None,
) -> list[UserRow]:
filters = []
if ids is not None:
filters.append(t"u.id = ANY({list(ids)})")
if name_pattern is not None:
filters.append(t"u.name ~* {name_pattern}")
if not filters:
raise TypeError("at least one filter required")
joined = sql.SQL(" AND ").join(filters)
cur = conn.cursor(row_factory=class_row(UserRow))
cur.execute(t"SELECT * FROM users AS u WHERE {joined:q}")
return cur.fetchall()
Inspect the composed SQL without executing (sql.as_string):
from psycopg import sql
name = "O'Reilly"
dob = datetime.date(1970, 1, 1)
print(sql.as_string(t"INSERT INTO tbl VALUES ({name}, {dob})"))
# INSERT INTO tbl VALUES ('O''Reilly', '1970-01-01'::date)
Python < 3.14 — psycopg.sql
from psycopg import sql
column = "email"
query = sql.SQL("SELECT {col} FROM users WHERE active = %(active)s").format(
col=sql.Identifier(column),
)
await cur.execute(query, {"active": True})
sql.Identifier quotes the identifier at the driver level — SQL injection via column/table names is impossible. The value (%(active)s) always stays as a bound parameter, never interpolated.
Never use f-strings or string concatenation for identifiers or values.
SQL formatting with sqlfmt
uv add --dev shandy-sqlfmt[jinjafmt]
sqlfmt db/queries/ # format
sqlfmt --check db/queries/ # CI check
pyproject.toml:
[tool.sqlfmt]
line_length = 119
sqlfmt enforces lowercase keywords, trailing commas, and consistent indentation.
Avoid LATERAL JOIN — use CTEs instead
LATERAL JOIN is hard to read and often poorly optimized. Prefer a CTE that pre-aggregates or pre-filters, then join that:
Avoid:
select
u.id,
u.email,
latest.total
from users as u
join lateral (
select o.total
from orders as o
where o.user_id = u.id
order by o.created_at desc
limit 1
) as latest on true
Prefer (CTE with aggregation, then join):
with latest_order as (
select
o.user_id,
o.total,
row_number() over (
partition by o.user_id order by o.created_at desc
) as rn
from orders as o
)
select
u.id,
u.email,
lo.total
from users as u
join latest_order as lo
on lo.user_id = u.id
and lo.rn = 1
The CTE is planned once, is easier to read, and allows the planner to optimise the join independently of the subquery.
Quick checklist
- Simple CRUD uses
pg_*helpers; custom queries use.sqlfiles - Inline SQL only for trivial queries ≤ 4 lines; anything with JOINs/CTEs/aggregations/subqueries uses a
.sqlfile - All parameters use
%(name)sstyle with a dict argument - SQL files formatted with
sqlfmt - Results mapped to a Pydantic model in
{topic}_models.py - Table-mapped models extend
PostgresTableModel; partial results extendBaseModel - Dynamic SQL uses t-strings (3.14+) or
psycopg.sql— chosen at authoring time based on pyproject.toml - No
LATERAL JOIN— use a CTE that groups/aggregates first, then join it
More from bmsuisse/skills
autoresearch
>
15codeunit-analyzer
>
14deslop
>
14coding-guidelines-python
>
13init-app-stack
Use this skill whenever the user wants to bootstrap, scaffold, or initialize a new full-stack app with a Vite + React + TanStack + shadcn/ui frontend and a FastAPI + Postgres backend. Triggers on requests like "create a new app", "set up a project", "scaffold a full-stack app", "init a new project", or anything involving starting a fresh React/FastAPI application from scratch.
12databricks-sql-autotuner
>
12