Read with 30s visibility timeout
April 30, 2026 ยท View on GitHub
What is PGMQ?
A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.
PGMQ (Postgres Message Queue) is a message queue built on Postgres. It provides reliable, transactional message processing with the familiarity of SQL. The pgmq Python library exposes a clean, unified API for interacting with PGMQ across four different database backends.
Prerequisites
A running PostgreSQL instance with the PGMQ extension installed.
Docker (recommended)
The fastest way to get started is with the pre-built Docker image:
docker run -d --name pgmq-postgres \
-e POSTGRES_PASSWORD=postgres \
-p 5432:5432 \
ghcr.io/pgmq/pg18-pgmq:latest
Then connect and enable PGMQ:
psql postgres://postgres:postgres@localhost:5432/postgres -c "CREATE EXTENSION pgmq;"
SQL Only
You can also install PGMQ's objects directly into the pgmq schema. Use this on hosted Postgres services that do not support custom extensions.
git clone https://github.com/pgmq/pgmq.git
cd pgmq
psql -f pgmq-extension/sql/pgmq.sql postgres://postgres:postgres@localhost:5432/postgres
Installation
pip install pgmq
Optional backends:
| Extra | Backend |
|---|---|
pgmq[async] | asyncpg |
pgmq[sqlalchemy] | SQLAlchemy (sync) |
pgmq[sqlalchemy-async] | SQLAlchemy (async) |
Features
Lightweight โ No background workers or external dependencies. Just Postgres SQL objects.
Exactly-once delivery โ Guaranteed delivery to a single consumer within a visibility timeout.
Four identical APIs โ Swap between sync (psycopg), async (asyncpg), sync SQLAlchemy, and async SQLAlchemy with minimal changes.
Queue management โ Create, drop, list, purge, and partition queues.
Message operations โ Send, read, archive, delete, pop. Batch operations for high throughput.
FIFO queues โ Ordered processing with message group keys.
Topic routing โ Pattern-based bindings for publish-subscribe and content-based routing.
Visibility timeouts โ Control how long a message stays hidden after reading.
Notifications โ PostgreSQL NOTIFY/LISTEN for real-time message arrival events.
Transactions โ Decorators and manual connection injection for complex workflows.
Structured logging โ stdlib logging with optional loguru backend.
Documentation
- Getting Started โ Installation, Docker setup, and first messages
- Configuration โ Environment variables and connection strings
- Clients โ Choosing and initializing backends
- Transactions โ Transaction decorators and manual connections
- Topic Routing โ Pattern-based message routing
- Notifications โ Real-time NOTIFY/LISTEN listeners
Quick Start
Sync (psycopg):
from pgmq import PGMQueue
queue = PGMQueue() # reads PG_* env vars by default
# Create a queue
queue.create_queue("my_queue")
# Send a message
msg_id = queue.send("my_queue", {"hello": "world"})
# Send a batch
batch_ids = queue.send_batch("my_queue", [{"foo": "bar"}, {"baz": "qux"}])
# Read with 30s visibility timeout
msg = queue.read("my_queue", vt=30)
print(msg.message) # {'hello': 'world'}
# Archive when done
queue.archive("my_queue", msg.msg_id)
Async (asyncpg):
from pgmq import AsyncPGMQueue
queue = AsyncPGMQueue()
await queue.init()
# Create a queue
await queue.create_queue("my_queue")
# Send a message
msg_id = await queue.send("my_queue", {"hello": "world"})
# Send a batch
batch_ids = await queue.send_batch("my_queue", [{"foo": "bar"}, {"baz": "qux"}])
# Read with 30s visibility timeout
msg = await queue.read("my_queue", vt=30)
print(msg.message) # {'hello': 'world'}
# Archive when done
await queue.archive("my_queue", msg.msg_id)
Development
# Install dependencies
uv sync --all-groups --all-extras
# Run tests (spins up Docker Postgres automatically)
make test
# Run lints
make lint
# Serve docs locally
make docs-serve
License
Apache-2.0