Case Study: Background Jobs (Queue & Cron)¶
Tổng quan¶
Xây dựng hệ thống background jobs với Queue và Cron, so sánh giữa Supabase và Cloudflare.
Yêu cầu¶
- Email notifications (queue-based)
- Daily reports generation (cron)
- Retry handling for failed jobs
- Job monitoring and logging
- Cleanup of old data (cron)
Kết quả mong đợi¶
- Reliable job processing
- Proper error handling
- Monitoring dashboard
- Comparison insights
Kiến trúc¶
┌─────────────────────────────────────────────────────────────┐
│ BACKGROUND JOBS ARCHITECTURE │
├─────────────────────────────────────────────────────────────┤
│ │
│ QUEUE JOBS (Event-driven) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Option A: Supabase pgmq │ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ Database Trigger → pgmq.send() → Consumer │ │ │
│ │ │ (Edge Function polling or external) │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ Option B: Cloudflare Queues │ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ Worker (Producer) → Queue → Worker (Consumer) │ │
│ │ │ Built-in retry, DLQ │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ CRON JOBS (Time-based) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Option A: Supabase pg_cron │ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ Scheduled SQL → Runs inside PostgreSQL │ │ │
│ │ │ Great for: Data aggregation, cleanup │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ Option B: Cloudflare Cron Triggers │ │
│ │ ┌────────────────────────────────────────────┐ │ │
│ │ │ Scheduled Worker → Runs at edge │ │ │
│ │ │ Great for: API calls, notifications │ │ │
│ │ └────────────────────────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Comparison Table¶
Queue: Supabase pgmq vs Cloudflare Queues¶
| Aspect | Supabase pgmq | Cloudflare Queues |
|---|---|---|
| Architecture | PostgreSQL extension | Distributed edge |
| Latency | 10-50ms | 1-5ms |
| Throughput | Limited by DB | High, auto-scale |
| Persistence | ACID, in DB | Eventually consistent |
| Retry | Manual | Built-in with backoff |
| DLQ | Manual | Built-in |
| Cost | Included | $0.40/M operations |
| Best for | DB-centric workflows | High throughput |
Cron: Supabase pg_cron vs Cloudflare Cron Triggers¶
| Aspect | Supabase pg_cron | Cloudflare Cron |
|---|---|---|
| Runs in | PostgreSQL | Edge Worker |
| Language | SQL/PLpgSQL | TypeScript/JS |
| Timeout | Statement timeout | 30s (free) / 15min |
| Access | Database direct | External APIs |
| Monitoring | pg_cron.job_run_details | Workers Analytics |
| Best for | DB maintenance | External integrations |
Implementation: Supabase Approach¶
Setup pgmq¶
-- Enable pgmq extension (Dashboard → Database → Extensions)
CREATE EXTENSION IF NOT EXISTS pgmq;
-- Create queues
SELECT pgmq.create('email_notifications');
SELECT pgmq.create('report_generation');
SELECT pgmq.create('cleanup_tasks');
Producer: Trigger-based Enqueue¶
-- Trigger: Enqueue email when user signs up
CREATE OR REPLACE FUNCTION enqueue_welcome_email()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pgmq.send(
'email_notifications',
jsonb_build_object(
'type', 'welcome_email',
'user_id', NEW.id,
'email', NEW.email,
'name', NEW.raw_user_meta_data->>'full_name',
'created_at', NOW()
)
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
CREATE TRIGGER on_user_signup
AFTER INSERT ON auth.users
FOR EACH ROW
EXECUTE FUNCTION enqueue_welcome_email();
-- Trigger: Enqueue notification when task assigned
CREATE OR REPLACE FUNCTION enqueue_task_notification()
RETURNS TRIGGER AS $$
BEGIN
IF NEW.assignee_id IS DISTINCT FROM OLD.assignee_id
AND NEW.assignee_id IS NOT NULL THEN
PERFORM pgmq.send(
'email_notifications',
jsonb_build_object(
'type', 'task_assigned',
'task_id', NEW.id,
'task_title', NEW.title,
'assignee_id', NEW.assignee_id,
'assigner_id', auth.uid()
)
);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
CREATE TRIGGER on_task_assigned
AFTER UPDATE ON tasks
FOR EACH ROW
EXECUTE FUNCTION enqueue_task_notification();
Consumer: Edge Function¶
// supabase/functions/process-email-queue/index.ts
import { serve } from 'https://deno.land/std@0.168.0/http/server.ts';
import { createClient } from 'https://esm.sh/@supabase/supabase-js@2';
const supabase = createClient(
Deno.env.get('SUPABASE_URL')!,
Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!
);
serve(async () => {
try {
// Read messages from queue (batch of 10)
const { data: messages, error } = await supabase.rpc('pgmq_read', {
queue_name: 'email_notifications',
vt: 60, // Visibility timeout: 60 seconds
qty: 10,
});
if (error) throw error;
if (!messages || messages.length === 0) {
return new Response(JSON.stringify({ processed: 0 }));
}
let processed = 0;
let failed = 0;
for (const msg of messages) {
try {
await processEmailMessage(msg.message);
// Delete successfully processed message
await supabase.rpc('pgmq_delete', {
queue_name: 'email_notifications',
msg_id: msg.msg_id,
});
processed++;
} catch (err) {
console.error('Failed to process message:', msg.msg_id, err);
// Check retry count
const retryCount = msg.read_ct || 0;
if (retryCount >= 3) {
// Move to dead letter queue
await supabase.rpc('pgmq_archive', {
queue_name: 'email_notifications',
msg_id: msg.msg_id,
});
// Log failed job
await supabase.from('job_failures').insert({
queue: 'email_notifications',
message: msg.message,
error: err.message,
attempts: retryCount,
});
}
// Otherwise, message will be visible again after VT expires
failed++;
}
}
return new Response(JSON.stringify({ processed, failed }));
} catch (error) {
console.error('Queue processing error:', error);
return new Response(JSON.stringify({ error: error.message }), {
status: 500,
});
}
});
async function processEmailMessage(message: any) {
const { type } = message;
switch (type) {
case 'welcome_email':
await sendWelcomeEmail(message);
break;
case 'task_assigned':
await sendTaskAssignedEmail(message);
break;
default:
console.warn('Unknown message type:', type);
}
}
async function sendWelcomeEmail(data: any) {
const response = await fetch('https://api.sendgrid.com/v3/mail/send', {
method: 'POST',
headers: {
'Authorization': `Bearer ${Deno.env.get('SENDGRID_API_KEY')}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
personalizations: [{ to: [{ email: data.email }] }],
from: { email: 'noreply@example.com', name: 'TaskFlow' },
subject: `Welcome to TaskFlow, ${data.name}!`,
content: [{
type: 'text/html',
value: `<h1>Welcome ${data.name}!</h1><p>Thanks for joining...</p>`,
}],
}),
});
if (!response.ok) {
throw new Error(`SendGrid error: ${response.status}`);
}
}
async function sendTaskAssignedEmail(data: any) {
// Get assignee email
const { data: assignee } = await supabase
.from('profiles')
.select('email, full_name')
.eq('id', data.assignee_id)
.single();
if (!assignee) throw new Error('Assignee not found');
// Send email...
}
Cron: pg_cron for Daily Reports¶
-- Enable pg_cron (Dashboard → Database → Extensions)
CREATE EXTENSION IF NOT EXISTS pg_cron;
-- Create job: Daily stats aggregation at 1 AM UTC
SELECT cron.schedule(
'daily-stats-aggregation',
'0 1 * * *',
$$
INSERT INTO daily_stats (date, total_tasks, completed_tasks, new_users)
SELECT
CURRENT_DATE - 1,
COUNT(*) FILTER (WHERE DATE(created_at) = CURRENT_DATE - 1),
COUNT(*) FILTER (WHERE DATE(completed_at) = CURRENT_DATE - 1),
(SELECT COUNT(*) FROM auth.users WHERE DATE(created_at) = CURRENT_DATE - 1)
FROM tasks;
$$
);
-- Create job: Weekly cleanup at Sunday 2 AM
SELECT cron.schedule(
'weekly-cleanup',
'0 2 * * 0',
$$
-- Delete old audit logs
DELETE FROM audit_log WHERE created_at < NOW() - INTERVAL '90 days';
-- Delete archived messages from pgmq
DELETE FROM pgmq.a_email_notifications WHERE archived_at < NOW() - INTERVAL '30 days';
-- Vacuum tables
VACUUM ANALYZE tasks;
VACUUM ANALYZE audit_log;
$$
);
-- View scheduled jobs
SELECT * FROM cron.job;
-- View job run history
SELECT * FROM cron.job_run_details ORDER BY start_time DESC LIMIT 10;
-- Unschedule a job
SELECT cron.unschedule('daily-stats-aggregation');
Implementation: Cloudflare Approach¶
Queue Setup¶
# wrangler.toml
name = "job-processor"
main = "src/index.ts"
[[queues.producers]]
queue = "email-notifications"
binding = "EMAIL_QUEUE"
[[queues.consumers]]
queue = "email-notifications"
max_batch_size = 10
max_retries = 3
dead_letter_queue = "email-dlq"
[[queues.producers]]
queue = "email-dlq"
binding = "EMAIL_DLQ"
Producer Worker¶
// src/producer.ts
import { Hono } from 'hono';
import { createClient } from '@supabase/supabase-js';
interface Env {
EMAIL_QUEUE: Queue;
SUPABASE_URL: string;
SUPABASE_SERVICE_KEY: string;
}
const app = new Hono<{ Bindings: Env }>();
// Webhook endpoint for Supabase database webhooks
app.post('/webhook/user-signup', async (c) => {
const { record } = await c.req.json();
await c.env.EMAIL_QUEUE.send({
type: 'welcome_email',
userId: record.id,
email: record.email,
name: record.raw_user_meta_data?.full_name,
timestamp: Date.now(),
});
return c.json({ success: true });
});
// API endpoint for manual enqueue
app.post('/api/notify', async (c) => {
const body = await c.req.json();
await c.env.EMAIL_QUEUE.send({
type: body.type,
data: body.data,
timestamp: Date.now(),
});
return c.json({ queued: true });
});
export default app;
Consumer Worker¶
// src/consumer.ts
interface Env {
SUPABASE_URL: string;
SUPABASE_SERVICE_KEY: string;
SENDGRID_API_KEY: string;
EMAIL_DLQ: Queue;
}
interface EmailMessage {
type: string;
userId?: string;
email?: string;
name?: string;
data?: any;
timestamp: number;
}
export default {
async queue(
batch: MessageBatch<EmailMessage>,
env: Env
): Promise<void> {
const supabase = createClient(env.SUPABASE_URL, env.SUPABASE_SERVICE_KEY);
for (const message of batch.messages) {
try {
await processMessage(message.body, env);
message.ack();
} catch (error) {
console.error('Failed to process message:', error);
// Retry will happen automatically (up to max_retries)
// After that, message goes to DLQ
message.retry();
}
}
},
};
async function processMessage(msg: EmailMessage, env: Env) {
switch (msg.type) {
case 'welcome_email':
await sendWelcomeEmail(msg, env);
break;
case 'task_assigned':
await sendTaskAssignedEmail(msg, env);
break;
default:
console.warn('Unknown message type:', msg.type);
}
}
async function sendWelcomeEmail(msg: EmailMessage, env: Env) {
const response = await fetch('https://api.sendgrid.com/v3/mail/send', {
method: 'POST',
headers: {
'Authorization': `Bearer ${env.SENDGRID_API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
personalizations: [{ to: [{ email: msg.email }] }],
from: { email: 'noreply@example.com', name: 'TaskFlow' },
subject: `Welcome to TaskFlow, ${msg.name}!`,
content: [{
type: 'text/html',
value: `<h1>Welcome ${msg.name}!</h1>`,
}],
}),
});
if (!response.ok) {
throw new Error(`SendGrid error: ${response.status}`);
}
}
Cron Trigger Worker¶
// src/cron.ts
interface Env {
SUPABASE_URL: string;
SUPABASE_SERVICE_KEY: string;
SLACK_WEBHOOK_URL: string;
}
export default {
async scheduled(
controller: ScheduledController,
env: Env,
ctx: ExecutionContext
): Promise<void> {
const cron = controller.cron;
switch (cron) {
case '0 9 * * *': // Daily at 9 AM UTC
await generateDailyReport(env);
break;
case '0 * * * *': // Every hour
await checkSystemHealth(env);
break;
case '0 0 * * 1': // Weekly on Monday at midnight
await sendWeeklySummary(env);
break;
}
},
};
async function generateDailyReport(env: Env) {
const supabase = createClient(env.SUPABASE_URL, env.SUPABASE_SERVICE_KEY);
// Get yesterday's stats
const yesterday = new Date();
yesterday.setDate(yesterday.getDate() - 1);
const dateStr = yesterday.toISOString().split('T')[0];
const [
{ count: newTasks },
{ count: completedTasks },
{ count: newUsers },
] = await Promise.all([
supabase
.from('tasks')
.select('*', { count: 'exact', head: true })
.gte('created_at', `${dateStr}T00:00:00`)
.lt('created_at', `${dateStr}T23:59:59`),
supabase
.from('tasks')
.select('*', { count: 'exact', head: true })
.eq('status', 'completed')
.gte('completed_at', `${dateStr}T00:00:00`)
.lt('completed_at', `${dateStr}T23:59:59`),
supabase
.from('profiles')
.select('*', { count: 'exact', head: true })
.gte('created_at', `${dateStr}T00:00:00`)
.lt('created_at', `${dateStr}T23:59:59`),
]);
// Save to database
await supabase.from('daily_stats').insert({
date: dateStr,
new_tasks: newTasks,
completed_tasks: completedTasks,
new_users: newUsers,
});
// Send Slack notification
await fetch(env.SLACK_WEBHOOK_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
text: `📊 Daily Report (${dateStr})\n• New tasks: ${newTasks}\n• Completed: ${completedTasks}\n• New users: ${newUsers}`,
}),
});
}
async function checkSystemHealth(env: Env) {
const supabase = createClient(env.SUPABASE_URL, env.SUPABASE_SERVICE_KEY);
// Check database connectivity
const { error } = await supabase.from('profiles').select('id').limit(1);
if (error) {
// Send alert
await fetch(env.SLACK_WEBHOOK_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
text: `🚨 ALERT: Database connectivity issue!\nError: ${error.message}`,
}),
});
}
}
# wrangler.toml - Cron configuration
[triggers]
crons = [
"0 9 * * *", # Daily at 9 AM UTC
"0 * * * *", # Every hour
"0 0 * * 1" # Weekly on Monday
]
Decision Framework¶
Chọn Queue nào?¶
┌─────────────────────────────────────────────────────────────┐
│ QUEUE DECISION TREE │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Need transaction with database? │ │
│ │ YES ──▶ Supabase pgmq │ │
│ │ NO ──▶ Continue │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────▼───────────────────────────────┐ │
│ │ High throughput (> 10K msg/min)? │ │
│ │ YES ──▶ Cloudflare Queues │ │
│ │ NO ──▶ Continue │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────▼───────────────────────────────┐ │
│ │ Need built-in retry/DLQ? │ │
│ │ YES ──▶ Cloudflare Queues │ │
│ │ NO ──▶ Supabase pgmq (simpler setup) │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Chọn Cron nào?¶
┌─────────────────────────────────────────────────────────────┐
│ CRON DECISION TREE │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Job involves only database operations? │ │
│ │ YES ──▶ Supabase pg_cron (direct SQL) │ │
│ │ NO ──▶ Continue │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────▼───────────────────────────────┐ │
│ │ Need to call external APIs? │ │
│ │ YES ──▶ Cloudflare Cron Triggers │ │
│ │ NO ──▶ Continue │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────▼───────────────────────────────┐ │
│ │ Job runs longer than 30 seconds? │ │
│ │ YES ──▶ Split into queue jobs │ │
│ │ NO ──▶ Either option works │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Monitoring¶
Supabase Monitoring¶
-- View pgmq queue stats
SELECT * FROM pgmq.metrics('email_notifications');
-- View recent job failures
SELECT * FROM job_failures
ORDER BY created_at DESC
LIMIT 20;
-- View pg_cron run history
SELECT
jobid,
runid,
job_pid,
database,
command,
status,
start_time,
end_time,
return_message
FROM cron.job_run_details
ORDER BY start_time DESC
LIMIT 20;
Cloudflare Monitoring¶
// Workers Analytics available in Dashboard
// Or use Logpush for detailed logs
// In worker, log structured data
console.log(JSON.stringify({
type: 'job_processed',
queue: 'email-notifications',
messageId: msg.id,
duration: Date.now() - startTime,
}));
Kết quả¶
Implementation Complete¶
| Feature | Supabase | Cloudflare |
|---|---|---|
| Email queue | pgmq + Edge Function | Queue + Worker |
| Daily reports | pg_cron | Cron Trigger |
| Retry handling | Manual | Built-in |
| Monitoring | SQL queries | Dashboard |
Recommendations¶
| Use Case | Recommended |
|---|---|
| Email notifications | Cloudflare Queues |
| Database cleanup | Supabase pg_cron |
| External API calls | Cloudflare Cron |
| Transactional jobs | Supabase pgmq |
| High throughput | Cloudflare Queues |
Lessons Learned¶
Supabase Approach¶
Pros: - Single platform, simpler infra - ACID transactions with queue operations - No extra cost
Cons: - Manual retry implementation - Limited throughput - Consumer requires polling
Cloudflare Approach¶
Pros: - Built-in retry and DLQ - High throughput - Better monitoring
Cons: - Separate platform - Additional cost - No direct DB transactions