Phase 5: Queue Processing¶
Mục tiêu¶
Implement background job processing với Supabase pgmq và Cloudflare Queues.
Thời gian ước tính: 3 giờ
Overview: Chọn Queue nào?¶
| Use Case | Supabase pgmq | Cloudflare Queue |
|---|---|---|
| Email notifications | ✅ | ✅ Recommended |
| Activity logging | ✅ Recommended | ✅ |
| DB-heavy operations | ✅ | |
| High throughput | ✅ |
Trong phase này, chúng ta sẽ implement cả hai để so sánh.
Part A: Supabase pgmq (Database Queue)¶
Step 1: Enable pgmq Extension¶
Step 2: Create Queues¶
-- supabase/migrations/20240101000010_create_queues.sql
-- Create activity log queue
SELECT pgmq.create('activity_log');
-- Create notification queue
SELECT pgmq.create('notifications');
-- Activity log table (for persistence)
CREATE TABLE public.activity_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
entity_type TEXT NOT NULL,
entity_id UUID NOT NULL,
action TEXT NOT NULL,
actor_id UUID REFERENCES auth.users(id),
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_activity_entity ON activity_logs(entity_type, entity_id);
CREATE INDEX idx_activity_actor ON activity_logs(actor_id);
Step 3: Producer - Trigger on Task Changes¶
-- supabase/migrations/20240101000011_activity_trigger.sql
CREATE OR REPLACE FUNCTION enqueue_activity_log()
RETURNS TRIGGER AS $$
DECLARE
action_type TEXT;
task_data JSONB;
BEGIN
-- Determine action
IF TG_OP = 'INSERT' THEN
action_type := 'created';
task_data := to_jsonb(NEW);
ELSIF TG_OP = 'UPDATE' THEN
action_type := 'updated';
task_data := jsonb_build_object(
'old', to_jsonb(OLD),
'new', to_jsonb(NEW),
'changes', (
SELECT jsonb_object_agg(key, value)
FROM jsonb_each(to_jsonb(NEW))
WHERE to_jsonb(NEW) -> key != to_jsonb(OLD) -> key
)
);
ELSIF TG_OP = 'DELETE' THEN
action_type := 'deleted';
task_data := to_jsonb(OLD);
END IF;
-- Enqueue activity
PERFORM pgmq.send(
'activity_log',
jsonb_build_object(
'entity_type', 'task',
'entity_id', COALESCE(NEW.id, OLD.id),
'action', action_type,
'actor_id', auth.uid(),
'data', task_data,
'timestamp', NOW()
)
);
-- Also enqueue notification for assignment changes
IF TG_OP = 'UPDATE'
AND OLD.assignee_id IS DISTINCT FROM NEW.assignee_id
AND NEW.assignee_id IS NOT NULL THEN
PERFORM pgmq.send(
'notifications',
jsonb_build_object(
'type', 'task_assigned',
'task_id', NEW.id,
'task_title', NEW.title,
'assignee_id', NEW.assignee_id,
'assigner_id', auth.uid()
)
);
END IF;
RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
CREATE TRIGGER on_task_change
AFTER INSERT OR UPDATE OR DELETE ON tasks
FOR EACH ROW
EXECUTE FUNCTION enqueue_activity_log();
Step 4: Consumer - Edge Function¶
// supabase/functions/process-activity-queue/index.ts
import { serve } from 'https://deno.land/std@0.168.0/http/server.ts';
import { createClient } from 'https://esm.sh/@supabase/supabase-js@2';
const supabase = createClient(
Deno.env.get('SUPABASE_URL')!,
Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!
);
serve(async (req) => {
try {
// Read messages from queue
const { data: messages, error: readError } = await supabase.rpc(
'pgmq_read',
{
queue_name: 'activity_log',
vt: 60, // 60 second visibility timeout
qty: 10, // Process 10 at a time
}
);
if (readError) throw readError;
if (!messages || messages.length === 0) {
return new Response(JSON.stringify({ processed: 0 }));
}
let processed = 0;
for (const msg of messages) {
try {
const activity = msg.message;
// Insert into activity_logs table
const { error: insertError } = await supabase
.from('activity_logs')
.insert({
entity_type: activity.entity_type,
entity_id: activity.entity_id,
action: activity.action,
actor_id: activity.actor_id,
metadata: activity.data,
});
if (insertError) throw insertError;
// Delete processed message
await supabase.rpc('pgmq_delete', {
queue_name: 'activity_log',
msg_id: msg.msg_id,
});
processed++;
} catch (err) {
console.error('Failed to process message:', msg.msg_id, err);
// Message will be visible again after visibility timeout
}
}
return new Response(JSON.stringify({ processed }));
} catch (error) {
console.error('Queue processing error:', error);
return new Response(JSON.stringify({ error: error.message }), {
status: 500,
});
}
});
Step 5: Schedule Consumer (pg_cron)¶
-- Run consumer every minute
SELECT cron.schedule(
'process-activity-queue',
'* * * * *',
$$
SELECT net.http_post(
url := 'https://YOUR_PROJECT.supabase.co/functions/v1/process-activity-queue',
headers := '{"Authorization": "Bearer YOUR_SERVICE_ROLE_KEY"}'::jsonb
);
$$
);
Part B: Cloudflare Queues¶
Step 1: Create Queue¶
# Create queue
wrangler queues create taskflow-notifications
# Create dead letter queue
wrangler queues create taskflow-notifications-dlq
Step 2: Configure wrangler.toml¶
# wrangler.toml
name = "taskflow"
# Queue producer binding
[[queues.producers]]
queue = "taskflow-notifications"
binding = "NOTIFICATION_QUEUE"
# Queue consumer
[[queues.consumers]]
queue = "taskflow-notifications"
max_batch_size = 10
max_retries = 3
dead_letter_queue = "taskflow-notifications-dlq"
Step 3: Producer - API Route¶
// src/app/api/tasks/[id]/assign/route.ts
import { createClient } from '@/lib/supabase/server';
import { NextResponse } from 'next/server';
export async function POST(
request: Request,
{ params }: { params: { id: string } }
) {
const supabase = await createClient();
const { assignee_id } = await request.json();
const { data: { user } } = await supabase.auth.getUser();
// Update task
const { data: task, error } = await supabase
.from('tasks')
.update({ assignee_id })
.eq('id', params.id)
.select('id, title')
.single();
if (error) {
return NextResponse.json({ error: error.message }, { status: 400 });
}
// Enqueue notification via Cloudflare Queue
// This is done through a separate worker or API call
await fetch(`${process.env.WORKER_URL}/api/queue/notification`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
type: 'task_assigned',
taskId: task.id,
taskTitle: task.title,
assigneeId: assignee_id,
assignerId: user!.id,
}),
});
return NextResponse.json({ task });
}
Step 4: Queue Worker (Producer endpoint)¶
// workers/queue-api/src/index.ts
import { Hono } from 'hono';
interface Env {
NOTIFICATION_QUEUE: Queue;
}
const app = new Hono<{ Bindings: Env }>();
app.post('/api/queue/notification', async (c) => {
const body = await c.req.json();
await c.env.NOTIFICATION_QUEUE.send({
...body,
timestamp: Date.now(),
});
return c.json({ queued: true });
});
export default app;
Step 5: Queue Consumer¶
// workers/notification-consumer/src/index.ts
import { createClient } from '@supabase/supabase-js';
interface Env {
SUPABASE_URL: string;
SUPABASE_SERVICE_KEY: string;
SENDGRID_API_KEY: string;
}
interface NotificationMessage {
type: string;
taskId: string;
taskTitle: string;
assigneeId: string;
assignerId: string;
timestamp: number;
}
export default {
async queue(
batch: MessageBatch<NotificationMessage>,
env: Env
): Promise<void> {
const supabase = createClient(env.SUPABASE_URL, env.SUPABASE_SERVICE_KEY);
for (const message of batch.messages) {
try {
const notification = message.body;
if (notification.type === 'task_assigned') {
await handleTaskAssigned(notification, supabase, env);
}
message.ack();
} catch (error) {
console.error('Failed to process notification:', error);
message.retry();
}
}
},
};
async function handleTaskAssigned(
notification: NotificationMessage,
supabase: any,
env: Env
) {
// Get assignee email
const { data: assignee } = await supabase
.from('profiles')
.select('email, full_name')
.eq('id', notification.assigneeId)
.single();
if (!assignee) return;
// Get assigner name
const { data: assigner } = await supabase
.from('profiles')
.select('full_name')
.eq('id', notification.assignerId)
.single();
// Send email via SendGrid
const response = await fetch('https://api.sendgrid.com/v3/mail/send', {
method: 'POST',
headers: {
'Authorization': `Bearer ${env.SENDGRID_API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
personalizations: [
{ to: [{ email: assignee.email, name: assignee.full_name }] }
],
from: { email: 'noreply@taskflow.app', name: 'TaskFlow' },
subject: `Task assigned: ${notification.taskTitle}`,
content: [
{
type: 'text/html',
value: `
<h2>You've been assigned a task</h2>
<p><strong>${notification.taskTitle}</strong></p>
<p>Assigned by: ${assigner?.full_name || 'Someone'}</p>
<a href="https://taskflow.app/tasks/${notification.taskId}">View Task</a>
`,
},
],
}),
});
if (!response.ok) {
throw new Error(`SendGrid error: ${response.status}`);
}
// Also save to notifications table (in-app notification)
await supabase.from('notifications').insert({
user_id: notification.assigneeId,
type: 'task_assigned',
title: 'New task assigned',
message: `You've been assigned "${notification.taskTitle}"`,
data: {
task_id: notification.taskId,
assigner_id: notification.assignerId,
},
});
}
Step 6: Deploy Consumer¶
Comparison Summary¶
When to use Supabase pgmq:¶
✅ Activity logging (needs database transaction)
✅ Complex queries before/after processing
✅ Data needs to stay in database
✅ Simple setup (no extra service)
❌ High throughput needed
❌ Complex retry logic
When to use Cloudflare Queues:¶
✅ Email/SMS notifications (external API)
✅ High throughput processing
✅ Built-in retry with DLQ
✅ Processing at edge
❌ Need database transaction
❌ Simple use case (overkill)
Verification Checklist¶
- [ ] pgmq extension enabled
- [ ] Activity queue created and working
- [ ] Task changes trigger activity logging
- [ ] Cloudflare Queue created
- [ ] Notification producer working
- [ ] Notification consumer processing messages
- [ ] Email notifications sending (or simulated)
Common Issues¶
pgmq: Function not found¶
Solution: Ensure pgmq extension is enabled and functions are exposed via RPC.
Cloudflare Queue: Message not acknowledged¶
Messages keep reappearing because message.ack() was not called.
Solution: Always call message.ack() after successful processing.
Next Phase¶
Chuyển sang Phase 6: Scheduled Jobs để implement cron jobs.