Supabase Queue Concepts
pgmq là gì?
pgmq (PostgreSQL Message Queue) là một PostgreSQL extension cho phép sử dụng database như một message queue.
┌─────────────────────────────────────────────────────────────┐
│ SUPABASE + pgmq │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Producer │────▶│ Queue │────▶│ Consumer │ │
│ │ (API/App) │ │ (pgmq) │ │ (Worker/CF) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────┐ │
│ │PostgreSQL │ │
│ │ Table │ │
│ └───────────┘ │
└─────────────────────────────────────────────────────────────┘
Tại sao dùng pgmq?
Lợi ích
| Lợi ích |
Chi tiết |
| Tích hợp sẵn |
Không cần Redis, RabbitMQ riêng |
| ACID |
Transactional với database operations |
| Đơn giản |
Chỉ là PostgreSQL extension |
| Chi phí |
Included trong Supabase plan |
| Reliability |
Data không mất khi crash |
Khi nào dùng?
- Email notifications
- Webhook delivery
- Background processing
- Task scheduling
- Event-driven workflows
Kiến trúc Message Queue
┌─────────────────────────────────────────────────────────────┐
│ MESSAGE LIFECYCLE │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. ENQUEUE 2. READ 3. PROCESS │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Producer │──────▶│ Queue │──────▶│ Consumer │ │
│ │ sends │ │ (visible)│ │ reads │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ │
│ │ Message │ │ Message │ │
│ │ hidden │ │ archived │ │
│ │(timeout) │ │or deleted│ │
│ └──────────┘ └──────────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ │
│ │ Visible │ (if not ACK'd) │
│ │ again │ │
│ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Core Concepts
1. Message
interface Message<T> {
msg_id: number; // Unique message ID
read_ct: number; // Number of times read
enqueued_at: string; // When enqueued
vt: string; // Visibility timeout
message: T; // Your payload
}
2. Visibility Timeout
Message enqueued ──▶ Consumer reads ──▶ Message hidden
│
▼
┌────────────────────────┐
│ Visibility Timeout │
│ (e.g., 30 seconds) │
└────────────────────────┘
│
┌─────────────┴─────────────┐
▼ ▼
ACK (delete) Timeout (visible again)
3. Queue Operations
| Operation |
Description |
send |
Add message to queue |
read |
Get message(s) from queue |
archive |
Mark message as processed |
delete |
Remove message |
set_vt |
Extend visibility timeout |
Enable pgmq Extension
Via SQL Editor
-- Enable pgmq extension
CREATE EXTENSION IF NOT EXISTS pgmq;
-- Create a queue
SELECT pgmq.create('email_queue');
-- List queues
SELECT * FROM pgmq.list_queues();
Via Migration
-- supabase/migrations/20240101000000_setup_queues.sql
-- Enable extension
CREATE EXTENSION IF NOT EXISTS pgmq;
-- Create queues
SELECT pgmq.create('email_queue');
SELECT pgmq.create('notification_queue');
SELECT pgmq.create('webhook_queue');
Basic Operations
Send (Enqueue)
-- Send single message
SELECT pgmq.send(
'email_queue',
'{"to": "user@example.com", "subject": "Hello"}'::jsonb
);
-- Send with delay (30 seconds)
SELECT pgmq.send(
'email_queue',
'{"to": "user@example.com"}'::jsonb,
30 -- delay in seconds
);
Read (Dequeue)
-- Read 1 message, hide for 30 seconds
SELECT * FROM pgmq.read('email_queue', 30, 1);
-- Read up to 10 messages
SELECT * FROM pgmq.read('email_queue', 30, 10);
Archive (ACK)
-- Archive single message
SELECT pgmq.archive('email_queue', 1); -- msg_id = 1
-- Archive multiple messages
SELECT pgmq.archive('email_queue', ARRAY[1, 2, 3]);
Delete
-- Delete message
SELECT pgmq.delete('email_queue', 1);
TypeScript Client
Using SQL via Supabase
// lib/queue.ts
import { createClient } from '@supabase/supabase-js';
const supabase = createClient(url, serviceRoleKey);
// Send message
async function enqueue<T>(queue: string, message: T) {
const { data, error } = await supabase
.rpc('pgmq_send', {
queue_name: queue,
message: message,
});
if (error) throw error;
return data; // Returns msg_id
}
// Read messages
async function dequeue<T>(queue: string, count = 1) {
const { data, error } = await supabase
.rpc('pgmq_read', {
queue_name: queue,
vt: 30, // visibility timeout
qty: count,
});
if (error) throw error;
return data as Message<T>[];
}
// Archive (ACK)
async function ack(queue: string, msgId: number) {
const { data, error } = await supabase
.rpc('pgmq_archive', {
queue_name: queue,
msg_id: msgId,
});
if (error) throw error;
return data;
}
Create RPC Functions
-- supabase/migrations/20240101000001_queue_functions.sql
-- Send message function
CREATE OR REPLACE FUNCTION pgmq_send(
queue_name TEXT,
message JSONB,
delay_seconds INTEGER DEFAULT 0
)
RETURNS BIGINT
LANGUAGE plpgsql
SECURITY DEFINER
AS $$
BEGIN
RETURN pgmq.send(queue_name, message, delay_seconds);
END;
$$;
-- Read messages function
CREATE OR REPLACE FUNCTION pgmq_read(
queue_name TEXT,
vt INTEGER DEFAULT 30,
qty INTEGER DEFAULT 1
)
RETURNS TABLE (
msg_id BIGINT,
read_ct INTEGER,
enqueued_at TIMESTAMPTZ,
vt TIMESTAMPTZ,
message JSONB
)
LANGUAGE plpgsql
SECURITY DEFINER
AS $$
BEGIN
RETURN QUERY SELECT * FROM pgmq.read(queue_name, vt, qty);
END;
$$;
-- Archive message function
CREATE OR REPLACE FUNCTION pgmq_archive(
queue_name TEXT,
msg_id BIGINT
)
RETURNS BOOLEAN
LANGUAGE plpgsql
SECURITY DEFINER
AS $$
BEGIN
RETURN pgmq.archive(queue_name, msg_id);
END;
$$;
Use Cases
1. Email Notifications
// Producer (API Route)
await enqueue('email_queue', {
type: 'welcome',
to: user.email,
data: { name: user.name },
});
// Consumer (Cloudflare Worker/Cron)
const messages = await dequeue('email_queue', 10);
for (const msg of messages) {
await sendEmail(msg.message);
await ack('email_queue', msg.msg_id);
}
2. Webhook Delivery
// Producer
await enqueue('webhook_queue', {
url: webhook.url,
payload: event,
attempts: 0,
});
// Consumer with retry
const msg = (await dequeue('webhook_queue'))[0];
try {
await fetch(msg.message.url, {
method: 'POST',
body: JSON.stringify(msg.message.payload),
});
await ack('webhook_queue', msg.msg_id);
} catch (e) {
// Extend visibility timeout for retry
await supabase.rpc('pgmq_set_vt', {
queue_name: 'webhook_queue',
msg_id: msg.msg_id,
vt_offset: 60, // retry after 60s
});
}
Tổng kết
pgmq Key Points
- PostgreSQL extension - không cần external service
- ACID compliant - transactional với database
- Visibility timeout - prevents duplicate processing
- Archive - message history for debugging
Queue Lifecycle
send() ──▶ read() ──▶ process ──▶ archive()
│
└──▶ timeout ──▶ visible again
Next: Setup & Configuration
- Tạo queues
- Configure visibility timeout
- Error handling
- Monitoring
Q&A
- Có use case nào cho queue trong dự án của bạn?
- Đã dùng message queue trước đây chưa?
- Concerns về reliability?