Read with 30s visibility timeout

April 30, 2026 ยท View on GitHub

PGMQ Python Client

PyPI Python Versions Downloads License

The official Python client for PGMQ.

๐Ÿ“– Documentation ย ยทย  ๐Ÿ’ป Source


What is PGMQ?

A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.

PGMQ (Postgres Message Queue) is a message queue built on Postgres. It provides reliable, transactional message processing with the familiarity of SQL. The pgmq Python library exposes a clean, unified API for interacting with PGMQ across four different database backends.


Prerequisites

A running PostgreSQL instance with the PGMQ extension installed.

The fastest way to get started is with the pre-built Docker image:

docker run -d --name pgmq-postgres \
  -e POSTGRES_PASSWORD=postgres \
  -p 5432:5432 \
  ghcr.io/pgmq/pg18-pgmq:latest

Then connect and enable PGMQ:

psql postgres://postgres:postgres@localhost:5432/postgres -c "CREATE EXTENSION pgmq;"

SQL Only

You can also install PGMQ's objects directly into the pgmq schema. Use this on hosted Postgres services that do not support custom extensions.

git clone https://github.com/pgmq/pgmq.git
cd pgmq
psql -f pgmq-extension/sql/pgmq.sql postgres://postgres:postgres@localhost:5432/postgres

Installation

pip install pgmq

Optional backends:

ExtraBackend
pgmq[async]asyncpg
pgmq[sqlalchemy]SQLAlchemy (sync)
pgmq[sqlalchemy-async]SQLAlchemy (async)

Features

Lightweight โ€” No background workers or external dependencies. Just Postgres SQL objects.

Exactly-once delivery โ€” Guaranteed delivery to a single consumer within a visibility timeout.

Four identical APIs โ€” Swap between sync (psycopg), async (asyncpg), sync SQLAlchemy, and async SQLAlchemy with minimal changes.

Queue management โ€” Create, drop, list, purge, and partition queues.

Message operations โ€” Send, read, archive, delete, pop. Batch operations for high throughput.

FIFO queues โ€” Ordered processing with message group keys.

Topic routing โ€” Pattern-based bindings for publish-subscribe and content-based routing.

Visibility timeouts โ€” Control how long a message stays hidden after reading.

Notifications โ€” PostgreSQL NOTIFY/LISTEN for real-time message arrival events.

Transactions โ€” Decorators and manual connection injection for complex workflows.

Structured logging โ€” stdlib logging with optional loguru backend.


Documentation

  • Getting Started โ€” Installation, Docker setup, and first messages
  • Configuration โ€” Environment variables and connection strings
  • Clients โ€” Choosing and initializing backends
  • Transactions โ€” Transaction decorators and manual connections
  • Topic Routing โ€” Pattern-based message routing
  • Notifications โ€” Real-time NOTIFY/LISTEN listeners

Quick Start

Sync (psycopg):

from pgmq import PGMQueue

queue = PGMQueue()  # reads PG_* env vars by default

# Create a queue
queue.create_queue("my_queue")

# Send a message
msg_id = queue.send("my_queue", {"hello": "world"})

# Send a batch
batch_ids = queue.send_batch("my_queue", [{"foo": "bar"}, {"baz": "qux"}])

# Read with 30s visibility timeout
msg = queue.read("my_queue", vt=30)
print(msg.message)  # {'hello': 'world'}

# Archive when done
queue.archive("my_queue", msg.msg_id)

Async (asyncpg):

from pgmq import AsyncPGMQueue

queue = AsyncPGMQueue()
await queue.init()

# Create a queue
await queue.create_queue("my_queue")

# Send a message
msg_id = await queue.send("my_queue", {"hello": "world"})

# Send a batch
batch_ids = await queue.send_batch("my_queue", [{"foo": "bar"}, {"baz": "qux"}])

# Read with 30s visibility timeout
msg = await queue.read("my_queue", vt=30)
print(msg.message)  # {'hello': 'world'}

# Archive when done
await queue.archive("my_queue", msg.msg_id)

Development

# Install dependencies
uv sync --all-groups --all-extras

# Run tests (spins up Docker Postgres automatically)
make test

# Run lints
make lint

# Serve docs locally
make docs-serve

License

Apache-2.0