A lightweight message broker system built inside PostgreSQL. pgmb enables asynchronous message processing with HTTP-based worker dispatch, automatic retries, and dead letter queue support.
- Worker Management: Register HTTP endpoints as workers with configurable rate limits (RPS)
- Queue System: Create queues with pattern-based routing keys (supports wildcards)
- Message Routing: Automatic message routing based on routing keys matching binding patterns
- HTTP Dispatch: Automatic message delivery to worker endpoints via HTTP POST
- Retry Logic: Configurable retry attempts with exponential backoff support
- Dead Letter Queue: Failed messages after max retries are moved to DLQ
- Scheduled Dispatch: Uses
pg_cronfor automatic message dispatching - Delayed Messages: Support for delayed message delivery
- PostgreSQL 12 or higher
pg_cronextensionhttpextension (for HTTP requests)
pgxn install pgmb- Clone the repository:
git clone https://github.com/fraruiz/pgmb.git
cd pgmb- Build and install:
make
sudo make install- Enable the extension in your database:
CREATE EXTENSION pg_cron;
CREATE EXTENSION http;
CREATE EXTENSION pgmb;SELECT pgmb.worker(
'order_processor', -- worker name
'http://localhost:8080/process', -- endpoint URL
100 -- requests per second limit
);
-- Returns: worker UUIDSELECT pgmb.create(
'order_queue', -- queue name
'order.*', -- binding key pattern (supports *)
'550e8400-e29b-41d4-a716-446655440000', -- worker UUID
5 -- max retries
);
-- Returns: queue UUID-- Simple message
SELECT pgmb.send(
gen_random_uuid(),
'order.created',
'{"order_id": 123, "amount": 45.67}'::jsonb
);
-- With headers
SELECT pgmb.send(
gen_random_uuid(),
'order.created',
'{"order_id": 123, "amount": 45.67}'::jsonb,
'{"source": "web", "priority": "high"}'::jsonb
);
-- Delayed message (10 minutes)
SELECT pgmb.send(
gen_random_uuid(),
'order.created',
'{"order_id": 123, "amount": 45.67}'::jsonb,
'{"source": "web"}'::jsonb,
now() + interval '10 minutes'
);
-- Delayed message (600 seconds)
SELECT pgmb.send(
gen_random_uuid(),
'order.created',
'{"order_id": 123, "amount": 45.67}'::jsonb,
'{"source": "web"}'::jsonb,
600
);Registers a new worker in the message broker.
Parameters:
name(VARCHAR): The name of the workerendpoint(VARCHAR): The HTTP endpoint URL where messages will be sentrps(INT): Requests per second limit for rate limiting
Returns: UUID of the created worker
Example:
SELECT pgmb.worker('email_sender', 'http://api.example.com/send-email', 50);Creates a new queue with a binding key pattern.
Parameters:
name(VARCHAR): Unique name for the queuebinding_key(VARCHAR): Pattern to match routing keys (supports*wildcard)max_retries(INT): Maximum number of retry attempts before moving to DLQworker_id(UUID): The worker UUID that will process messages from this queue
Returns: UUID of the created queue
Example:
SELECT pgmb.create('order_queue', 'order.*', 5, '550e8400-e29b-41d4-a716-446655440000');Sends a message to the broker.
Parameters:
id(UUID): Unique identifier for the messagerouting_key(VARCHAR): Routing key for message routingbody(JSONB): Message payload
Returns: VOID
Example:
SELECT pgmb.send(
gen_random_uuid(),
'order.created',
'{"order_id": 123}'::jsonb
);Sends a message with custom headers.
Parameters:
id(UUID): Unique identifier for the messagerouting_key(VARCHAR): Routing key for message routingbody(JSONB): Message payloadheaders(JSONB): Optional message headers
Returns: VOID
Sends a delayed message. Delay can be a TIMESTAMP or INTEGER (seconds).
Parameters:
id(UUID): Unique identifier for the messagerouting_key(VARCHAR): Routing key for message routingbody(JSONB): Message payloadheaders(JSONB): Optional message headersdelay(TIMESTAMPTZ or INT): When to enqueue the message
Returns: VOID
-
Message Publishing: When you call
pgmb.send(), a message is inserted intopgmb.messagestable. -
Automatic Routing: A trigger (
enqueue_message_trigger) automatically routes messages to matching queues based on routing key patterns. -
Queue Processing: Each queue has its own table (
{queue_name}_queue) that stores message references. -
Scheduled Dispatch:
pg_cronrunspgmb.dispatch_messages()every second for each queue, which:- Locks messages for processing (using
FOR UPDATE SKIP LOCKED) - Sends HTTP POST requests to worker endpoints
- Handles acknowledgments and retries
- Moves failed messages to dead letter queues after max retries
- Locks messages for processing (using
-
Dead Letter Queue: Failed messages are moved to
{queue_name}_dead_letter_queueafter exceeding max retries.
pgmb.workers: Stores worker registrationspgmb.queues: Stores queue definitions and bindingspgmb.messages: Stores all messagespgmb.{queue_name}_queue: Per-queue message referencespgmb.{queue_name}_dead_letter_queue: Per-queue failed messages
SELECT * FROM pgmb.workers;SELECT * FROM pgmb.queues;SELECT COUNT(*) FROM pgmb.order_queue WHERE acknoledge = false;SELECT * FROM pgmb.order_dead_letter_queue;Your worker endpoints should:
- Accept HTTP POST requests
- Accept JSON body
- Return HTTP status codes:
2xx: Success (message will be acknowledged)4xx/5xx: Failure (message will be retried)
Example Worker Endpoint (Node.js):
app.post('/process', async (req, res) => {
try {
await processMessage(req.body);
res.status(200).json({ success: true });
} catch (error) {
res.status(500).json({ error: error.message });
}
});PostgreSQL License
Francisco Ruiz - franciscoruizlezcano@gmail.com