pg-messaging
Postgres Messaging Primitives
Pure-SQL pub/sub and task queue implementations using PostgreSQL. No external message brokers required. All logic lives in SQL — clients connect directly, or through a thin HTTP/gRPC layer.
Core Principles
- SQL is the API: All messaging logic is implemented as SQL queries, not application code
- Atomic by default: Writes reserve offsets and insert messages in a single transaction
- Consumer groups: Log-based pub/sub with per-group offset tracking, inspired by Kafka
- Lock-free queues:
SELECT FOR UPDATE SKIP LOCKEDfor contention-free job claiming - Flexible access: Direct Postgres connections, HTTP wrapper, or auto-generated API (PostgREST)
Pub/Sub System
A log-based pub/sub with monotonically increasing offsets per consumer group.
Schema
-- Tracks the next available offset for each topic
CREATE TABLE log_counter (
id INT PRIMARY KEY,
next_offset BIGINT NOT NULL
);
-- Initialize counter for topic 0 (add more for additional topics)
INSERT INTO log_counter (id, next_offset) VALUES (0, 1);
-- Topic log table
CREATE TABLE topic (
id BIGSERIAL PRIMARY KEY,
topic_id INT NOT NULL,
c_offset BIGINT NOT NULL,
payload BYTEA NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (topic_id, c_offset)
);
-- Consumer group offset tracking (per topic)
CREATE TABLE consumer_offsets (
group_id TEXT NOT NULL,
topic_id INT NOT NULL,
next_offset BIGINT NOT NULL DEFAULT 1,
PRIMARY KEY (group_id, topic_id)
);
Publish Messages
Atomically reserves offsets and inserts messages in a single transaction:
-- Parameters:
-- $1 = number of messages (batch size)
-- $2 = array of message payloads (bytea[])
-- $3 = topic_id (int)
WITH reserve AS (
UPDATE log_counter
SET next_offset = next_offset + $1
WHERE id = $3::int
RETURNING (next_offset - $1) AS first_off
)
INSERT INTO topic(topic_id, c_offset, payload)
SELECT $3::int, r.first_off + p.ord - 1, p.payload
FROM reserve r,
unnest($2::bytea[]) WITH ORDINALITY AS p(payload, ord);
Initialize Consumer Group
Consumer groups must be initialized before first use:
INSERT INTO consumer_offsets (group_id, topic_id, next_offset)
VALUES ('my-consumer-group', 0, 1)
ON CONFLICT (group_id, topic_id) DO NOTHING;
Claim Offsets
Atomically claims a range of offsets for a consumer group:
-- Parameters:
-- $1 = consumer group_id (text)
-- $2 = max number of messages to claim (bigint)
-- $3 = topic_id (int)
WITH counter_tip AS (
SELECT (next_offset - 1) AS highest_committed_offset
FROM log_counter
WHERE id = $3::int
),
to_claim AS (
SELECT
c.group_id,
c.next_offset AS n0,
LEAST($2::bigint, GREATEST(0,
(SELECT highest_committed_offset FROM counter_tip) -
c.next_offset + 1)) AS delta
FROM consumer_offsets c
WHERE c.group_id = $1::text AND c.topic_id = $3::int
FOR UPDATE
),
upd AS (
UPDATE consumer_offsets c
SET next_offset = c.next_offset + t.delta
FROM to_claim t
WHERE c.group_id = t.group_id AND c.topic_id = $3::int
RETURNING t.n0 AS claimed_start_offset,
(c.next_offset - 1) AS claimed_end_offset
)
SELECT claimed_start_offset, claimed_end_offset FROM upd;
Fetch Messages
After claiming offsets, retrieve the actual messages:
-- Parameters:
-- $1 = start offset (from claim operation)
-- $2 = end offset (from claim operation)
-- $3 = topic_id (int)
SELECT c_offset, payload, created_at
FROM topic
WHERE topic_id = $3::int AND c_offset BETWEEN $1 AND $2
ORDER BY c_offset;
Queue System
A task queue using SELECT FOR UPDATE SKIP LOCKED for lock-free job claiming
across multiple workers.
Schema
CREATE TABLE queue (
id BIGSERIAL PRIMARY KEY,
payload BYTEA NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE TABLE queue_archive (
id BIGINT,
payload BYTEA NOT NULL,
created_at TIMESTAMP NOT NULL,
processed_at TIMESTAMP NOT NULL DEFAULT NOW()
);
Enqueue
INSERT INTO queue (payload) VALUES ($1);
Claim and Process
Run within a single transaction:
BEGIN;
-- Claim a job (lock-free with SKIP LOCKED)
SELECT id, payload, created_at
FROM queue
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT 1;
-- Archive the processed job
DELETE FROM queue WHERE id = $1;
INSERT INTO queue_archive (id, payload, created_at, processed_at)
VALUES ($1, $2, $3, NOW());
COMMIT;
A single CTE version for sqlc compatibility:
WITH deleted AS (
DELETE FROM queue WHERE id = $1 RETURNING *
)
INSERT INTO queue_archive (id, payload, created_at)
SELECT id, payload, created_at FROM deleted;
Using with sqlc
Both systems work with sqlc out of the box. Example query annotations:
-- name: PublishMessages :exec
WITH reserve AS (
UPDATE log_counter
SET next_offset = next_offset + $1
WHERE id = $3::int
RETURNING (next_offset - $1) AS first_off
)
INSERT INTO topic(topic_id, c_offset, payload)
SELECT $3::int, r.first_off + p.ord - 1, p.payload
FROM reserve r,
unnest($2::bytea[]) WITH ORDINALITY AS p(payload, ord);
-- name: ClaimOffsets :one
WITH counter_tip AS (
SELECT (next_offset - 1) AS highest_committed_offset
FROM log_counter WHERE id = $3::int
),
to_claim AS (
SELECT c.group_id, c.next_offset AS n0,
LEAST($2::bigint, GREATEST(0,
(SELECT highest_committed_offset FROM counter_tip) - c.next_offset + 1
)) AS delta
FROM consumer_offsets c
WHERE c.group_id = $1::text AND c.topic_id = $3::int
FOR UPDATE
),
upd AS (
UPDATE consumer_offsets c
SET next_offset = c.next_offset + t.delta
FROM to_claim t
WHERE c.group_id = t.group_id AND c.topic_id = $3::int
RETURNING t.n0 AS claimed_start_offset, (c.next_offset - 1) AS claimed_end_offset
)
SELECT claimed_start_offset, claimed_end_offset FROM upd;
-- name: GetMessages :many
SELECT c_offset, payload, created_at
FROM topic
WHERE topic_id = $3::int AND c_offset BETWEEN $1 AND $2
ORDER BY c_offset;
-- name: EnqueueJob :exec
INSERT INTO queue (payload) VALUES ($1);
-- name: ClaimJob :one
SELECT id, payload, created_at
FROM queue
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT 1;
-- name: ArchiveJob :exec
WITH deleted AS (
DELETE FROM queue WHERE id = $1 RETURNING *
)
INSERT INTO queue_archive (id, payload, created_at)
SELECT id, payload, created_at FROM deleted;
Architecture Patterns
Direct Postgres Connection
Clients use native drivers (pgx, psycopg2, node-postgres) and execute the SQL directly. Simplest architecture — no additional services. Best when clients already have database access.
HTTP/API Wrapper
A thin Go/Python/Rust service wraps the SQL and exposes REST/gRPC endpoints:
POST /pubsub/publish
GET /pubsub/consume
POST /queue/enqueue
GET /queue/claim
Better security boundary, connection pooling, and authentication.
Operational Considerations
- Topics: Create an entry in
log_counterfor each topic (topic_id 0, 1, 2, etc.) - Connection pooling: Essential for performance — use pgbouncer or similar
- Monitoring: Track consumer lag per topic, queue depth, message throughput
- Vacuuming: Regular VACUUM on queue and archive tables to reclaim space
- Indexes: The UNIQUE constraint on
(topic_id, c_offset)creates an index automatically
Scaling
- Vertical: 4vCPU handles ~5k writes/s and ~25k reads/s; 96vCPU handles ~243k writes/s and ~1.2M reads/s
- Read replicas: Offload consumer reads to replicas
- Multiple topics: Already supported — add more topic_ids to
log_counter - Topic partitioning: Split a high-volume topic across multiple physical
tables to reduce lock contention on a single
log_counterentry
Key Patterns
- Always initialize consumer groups before first read
- Publish uses a two-phase approach: reserve offsets in
log_counter, then bulk insert - Claim + fetch are separate operations — claim is the atomic coordination point, fetch is a simple range query
- Queue claim uses
SKIP LOCKEDso blocked workers never wait — they just get the next available job - Archive processed queue jobs rather than deleting them to preserve audit history
More from brojonat/llmsrules
ibis-data
Use Ibis for database-agnostic data access in Python. Use when writing data queries, connecting to databases (DuckDB, PostgreSQL, SQLite), or building portable data pipelines that should work across backends.
13go-service
Build Go microservices with stdlib HTTP handlers, sqlc, urfave/cli, and slog. Use when creating or modifying a Go HTTP server, adding routes, middleware, database queries, or CLI commands.
13temporal-go
Build Temporal workflow applications in Go. Use when creating or modifying Temporal workflows, activities, workers, clients, signals, queries, updates, retry policies, saga patterns, or writing Temporal tests.
13parquet-analysis
Analyze parquet files using Python and Ibis. Use when the user wants to explore, transform, or analyze parquet data files, perform aggregations, joins, or export results. Works with local parquet files and provides database-agnostic data operations.
12ducklake
Work with DuckLake, an open lakehouse format built on DuckDB. Use when creating or querying DuckLake tables, managing snapshots, time travel, schema evolution, partitioning, or lakehouse maintenance operations.
12temporal-python
Build Temporal applications in Python using the temporalio SDK. Use when creating workflows, activities, workers, clients, signals, queries, updates, child workflows, timers, retry policies, saga/compensation patterns, testing, or any durable execution pattern in Python.
12