Bỏ qua

Supabase Queue Concepts

pgmq là gì?

pgmq (PostgreSQL Message Queue) là một PostgreSQL extension cho phép sử dụng database như một message queue.

┌─────────────────────────────────────────────────────────────┐
│                    SUPABASE + pgmq                           │
│                                                              │
│  ┌─────────────┐     ┌─────────────┐     ┌─────────────┐    │
│  │  Producer   │────▶│    Queue    │────▶│  Consumer   │    │
│  │  (API/App)  │     │   (pgmq)    │     │ (Worker/CF) │    │
│  └─────────────┘     └─────────────┘     └─────────────┘    │
│                            │                                 │
│                            ▼                                 │
│                      ┌───────────┐                          │
│                      │PostgreSQL │                          │
│                      │  Table    │                          │
│                      └───────────┘                          │
└─────────────────────────────────────────────────────────────┘

Tại sao dùng pgmq?

Lợi ích

Lợi ích Chi tiết
Tích hợp sẵn Không cần Redis, RabbitMQ riêng
ACID Transactional với database operations
Đơn giản Chỉ là PostgreSQL extension
Chi phí Included trong Supabase plan
Reliability Data không mất khi crash

Khi nào dùng?

  • Email notifications
  • Webhook delivery
  • Background processing
  • Task scheduling
  • Event-driven workflows

Kiến trúc Message Queue

┌─────────────────────────────────────────────────────────────┐
│                      MESSAGE LIFECYCLE                       │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│   1. ENQUEUE          2. READ            3. PROCESS         │
│   ┌──────────┐       ┌──────────┐       ┌──────────┐       │
│   │ Producer │──────▶│  Queue   │──────▶│ Consumer │       │
│   │  sends   │       │ (visible)│       │  reads   │       │
│   └──────────┘       └──────────┘       └──────────┘       │
│                            │                  │             │
│                            ▼                  ▼             │
│                      ┌──────────┐       ┌──────────┐       │
│                      │ Message  │       │ Message  │       │
│                      │ hidden   │       │ archived │       │
│                      │(timeout) │       │or deleted│       │
│                      └──────────┘       └──────────┘       │
│                            │                                │
│                            ▼                                │
│                      ┌──────────┐                          │
│                      │ Visible  │  (if not ACK'd)          │
│                      │  again   │                          │
│                      └──────────┘                          │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Core Concepts

1. Message

interface Message<T> {
  msg_id: number;        // Unique message ID
  read_ct: number;       // Number of times read
  enqueued_at: string;   // When enqueued
  vt: string;            // Visibility timeout
  message: T;            // Your payload
}

2. Visibility Timeout

Message enqueued ──▶ Consumer reads ──▶ Message hidden
              ┌────────────────────────┐
              │  Visibility Timeout    │
              │  (e.g., 30 seconds)    │
              └────────────────────────┘
             ┌─────────────┴─────────────┐
             ▼                           ▼
        ACK (delete)              Timeout (visible again)

3. Queue Operations

Operation Description
send Add message to queue
read Get message(s) from queue
archive Mark message as processed
delete Remove message
set_vt Extend visibility timeout

Enable pgmq Extension

Via SQL Editor

-- Enable pgmq extension
CREATE EXTENSION IF NOT EXISTS pgmq;

-- Create a queue
SELECT pgmq.create('email_queue');

-- List queues
SELECT * FROM pgmq.list_queues();

Via Migration

-- supabase/migrations/20240101000000_setup_queues.sql

-- Enable extension
CREATE EXTENSION IF NOT EXISTS pgmq;

-- Create queues
SELECT pgmq.create('email_queue');
SELECT pgmq.create('notification_queue');
SELECT pgmq.create('webhook_queue');

Basic Operations

Send (Enqueue)

-- Send single message
SELECT pgmq.send(
  'email_queue',
  '{"to": "user@example.com", "subject": "Hello"}'::jsonb
);

-- Send with delay (30 seconds)
SELECT pgmq.send(
  'email_queue',
  '{"to": "user@example.com"}'::jsonb,
  30  -- delay in seconds
);

Read (Dequeue)

-- Read 1 message, hide for 30 seconds
SELECT * FROM pgmq.read('email_queue', 30, 1);

-- Read up to 10 messages
SELECT * FROM pgmq.read('email_queue', 30, 10);

Archive (ACK)

-- Archive single message
SELECT pgmq.archive('email_queue', 1);  -- msg_id = 1

-- Archive multiple messages
SELECT pgmq.archive('email_queue', ARRAY[1, 2, 3]);

Delete

-- Delete message
SELECT pgmq.delete('email_queue', 1);

TypeScript Client

Using SQL via Supabase

// lib/queue.ts
import { createClient } from '@supabase/supabase-js';

const supabase = createClient(url, serviceRoleKey);

// Send message
async function enqueue<T>(queue: string, message: T) {
  const { data, error } = await supabase
    .rpc('pgmq_send', {
      queue_name: queue,
      message: message,
    });

  if (error) throw error;
  return data; // Returns msg_id
}

// Read messages
async function dequeue<T>(queue: string, count = 1) {
  const { data, error } = await supabase
    .rpc('pgmq_read', {
      queue_name: queue,
      vt: 30, // visibility timeout
      qty: count,
    });

  if (error) throw error;
  return data as Message<T>[];
}

// Archive (ACK)
async function ack(queue: string, msgId: number) {
  const { data, error } = await supabase
    .rpc('pgmq_archive', {
      queue_name: queue,
      msg_id: msgId,
    });

  if (error) throw error;
  return data;
}

Create RPC Functions

-- supabase/migrations/20240101000001_queue_functions.sql

-- Send message function
CREATE OR REPLACE FUNCTION pgmq_send(
  queue_name TEXT,
  message JSONB,
  delay_seconds INTEGER DEFAULT 0
)
RETURNS BIGINT
LANGUAGE plpgsql
SECURITY DEFINER
AS $$
BEGIN
  RETURN pgmq.send(queue_name, message, delay_seconds);
END;
$$;

-- Read messages function
CREATE OR REPLACE FUNCTION pgmq_read(
  queue_name TEXT,
  vt INTEGER DEFAULT 30,
  qty INTEGER DEFAULT 1
)
RETURNS TABLE (
  msg_id BIGINT,
  read_ct INTEGER,
  enqueued_at TIMESTAMPTZ,
  vt TIMESTAMPTZ,
  message JSONB
)
LANGUAGE plpgsql
SECURITY DEFINER
AS $$
BEGIN
  RETURN QUERY SELECT * FROM pgmq.read(queue_name, vt, qty);
END;
$$;

-- Archive message function
CREATE OR REPLACE FUNCTION pgmq_archive(
  queue_name TEXT,
  msg_id BIGINT
)
RETURNS BOOLEAN
LANGUAGE plpgsql
SECURITY DEFINER
AS $$
BEGIN
  RETURN pgmq.archive(queue_name, msg_id);
END;
$$;

Use Cases

1. Email Notifications

// Producer (API Route)
await enqueue('email_queue', {
  type: 'welcome',
  to: user.email,
  data: { name: user.name },
});

// Consumer (Cloudflare Worker/Cron)
const messages = await dequeue('email_queue', 10);
for (const msg of messages) {
  await sendEmail(msg.message);
  await ack('email_queue', msg.msg_id);
}

2. Webhook Delivery

// Producer
await enqueue('webhook_queue', {
  url: webhook.url,
  payload: event,
  attempts: 0,
});

// Consumer with retry
const msg = (await dequeue('webhook_queue'))[0];
try {
  await fetch(msg.message.url, {
    method: 'POST',
    body: JSON.stringify(msg.message.payload),
  });
  await ack('webhook_queue', msg.msg_id);
} catch (e) {
  // Extend visibility timeout for retry
  await supabase.rpc('pgmq_set_vt', {
    queue_name: 'webhook_queue',
    msg_id: msg.msg_id,
    vt_offset: 60, // retry after 60s
  });
}

Tổng kết

pgmq Key Points

  1. PostgreSQL extension - không cần external service
  2. ACID compliant - transactional với database
  3. Visibility timeout - prevents duplicate processing
  4. Archive - message history for debugging

Queue Lifecycle

send() ──▶ read() ──▶ process ──▶ archive()
                         └──▶ timeout ──▶ visible again

Next: Setup & Configuration

  • Tạo queues
  • Configure visibility timeout
  • Error handling
  • Monitoring

Q&A

  1. Có use case nào cho queue trong dự án của bạn?
  2. Đã dùng message queue trước đây chưa?
  3. Concerns về reliability?