Bỏ qua

Case Study: Background Jobs (Queue & Cron)

Tổng quan

Xây dựng hệ thống background jobs với Queue và Cron, so sánh giữa Supabase và Cloudflare.

Yêu cầu

  • Email notifications (queue-based)
  • Daily reports generation (cron)
  • Retry handling for failed jobs
  • Job monitoring and logging
  • Cleanup of old data (cron)

Kết quả mong đợi

  • Reliable job processing
  • Proper error handling
  • Monitoring dashboard
  • Comparison insights

Kiến trúc

┌─────────────────────────────────────────────────────────────┐
│               BACKGROUND JOBS ARCHITECTURE                   │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  QUEUE JOBS (Event-driven)                                  │
│  ┌─────────────────────────────────────────────────────┐    │
│  │                                                      │    │
│  │  Option A: Supabase pgmq                            │    │
│  │  ┌────────────────────────────────────────────┐     │    │
│  │  │  Database Trigger → pgmq.send() → Consumer │     │    │
│  │  │  (Edge Function polling or external)        │     │    │
│  │  └────────────────────────────────────────────┘     │    │
│  │                                                      │    │
│  │  Option B: Cloudflare Queues                        │    │
│  │  ┌────────────────────────────────────────────┐     │    │
│  │  │  Worker (Producer) → Queue → Worker (Consumer)  │    │
│  │  │  Built-in retry, DLQ                        │    │    │
│  │  └────────────────────────────────────────────┘     │    │
│  │                                                      │    │
│  └─────────────────────────────────────────────────────┘    │
│                                                              │
│  CRON JOBS (Time-based)                                     │
│  ┌─────────────────────────────────────────────────────┐    │
│  │                                                      │    │
│  │  Option A: Supabase pg_cron                         │    │
│  │  ┌────────────────────────────────────────────┐     │    │
│  │  │  Scheduled SQL → Runs inside PostgreSQL     │     │    │
│  │  │  Great for: Data aggregation, cleanup       │     │    │
│  │  └────────────────────────────────────────────┘     │    │
│  │                                                      │    │
│  │  Option B: Cloudflare Cron Triggers                 │    │
│  │  ┌────────────────────────────────────────────┐     │    │
│  │  │  Scheduled Worker → Runs at edge            │     │    │
│  │  │  Great for: API calls, notifications        │     │    │
│  │  └────────────────────────────────────────────┘     │    │
│  │                                                      │    │
│  └─────────────────────────────────────────────────────┘    │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Comparison Table

Queue: Supabase pgmq vs Cloudflare Queues

Aspect Supabase pgmq Cloudflare Queues
Architecture PostgreSQL extension Distributed edge
Latency 10-50ms 1-5ms
Throughput Limited by DB High, auto-scale
Persistence ACID, in DB Eventually consistent
Retry Manual Built-in with backoff
DLQ Manual Built-in
Cost Included $0.40/M operations
Best for DB-centric workflows High throughput

Cron: Supabase pg_cron vs Cloudflare Cron Triggers

Aspect Supabase pg_cron Cloudflare Cron
Runs in PostgreSQL Edge Worker
Language SQL/PLpgSQL TypeScript/JS
Timeout Statement timeout 30s (free) / 15min
Access Database direct External APIs
Monitoring pg_cron.job_run_details Workers Analytics
Best for DB maintenance External integrations

Implementation: Supabase Approach

Setup pgmq

-- Enable pgmq extension (Dashboard → Database → Extensions)
CREATE EXTENSION IF NOT EXISTS pgmq;

-- Create queues
SELECT pgmq.create('email_notifications');
SELECT pgmq.create('report_generation');
SELECT pgmq.create('cleanup_tasks');

Producer: Trigger-based Enqueue

-- Trigger: Enqueue email when user signs up
CREATE OR REPLACE FUNCTION enqueue_welcome_email()
RETURNS TRIGGER AS $$
BEGIN
  PERFORM pgmq.send(
    'email_notifications',
    jsonb_build_object(
      'type', 'welcome_email',
      'user_id', NEW.id,
      'email', NEW.email,
      'name', NEW.raw_user_meta_data->>'full_name',
      'created_at', NOW()
    )
  );
  RETURN NEW;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

CREATE TRIGGER on_user_signup
  AFTER INSERT ON auth.users
  FOR EACH ROW
  EXECUTE FUNCTION enqueue_welcome_email();

-- Trigger: Enqueue notification when task assigned
CREATE OR REPLACE FUNCTION enqueue_task_notification()
RETURNS TRIGGER AS $$
BEGIN
  IF NEW.assignee_id IS DISTINCT FROM OLD.assignee_id
     AND NEW.assignee_id IS NOT NULL THEN
    PERFORM pgmq.send(
      'email_notifications',
      jsonb_build_object(
        'type', 'task_assigned',
        'task_id', NEW.id,
        'task_title', NEW.title,
        'assignee_id', NEW.assignee_id,
        'assigner_id', auth.uid()
      )
    );
  END IF;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

CREATE TRIGGER on_task_assigned
  AFTER UPDATE ON tasks
  FOR EACH ROW
  EXECUTE FUNCTION enqueue_task_notification();

Consumer: Edge Function

// supabase/functions/process-email-queue/index.ts
import { serve } from 'https://deno.land/std@0.168.0/http/server.ts';
import { createClient } from 'https://esm.sh/@supabase/supabase-js@2';

const supabase = createClient(
  Deno.env.get('SUPABASE_URL')!,
  Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!
);

serve(async () => {
  try {
    // Read messages from queue (batch of 10)
    const { data: messages, error } = await supabase.rpc('pgmq_read', {
      queue_name: 'email_notifications',
      vt: 60, // Visibility timeout: 60 seconds
      qty: 10,
    });

    if (error) throw error;
    if (!messages || messages.length === 0) {
      return new Response(JSON.stringify({ processed: 0 }));
    }

    let processed = 0;
    let failed = 0;

    for (const msg of messages) {
      try {
        await processEmailMessage(msg.message);

        // Delete successfully processed message
        await supabase.rpc('pgmq_delete', {
          queue_name: 'email_notifications',
          msg_id: msg.msg_id,
        });

        processed++;
      } catch (err) {
        console.error('Failed to process message:', msg.msg_id, err);

        // Check retry count
        const retryCount = msg.read_ct || 0;
        if (retryCount >= 3) {
          // Move to dead letter queue
          await supabase.rpc('pgmq_archive', {
            queue_name: 'email_notifications',
            msg_id: msg.msg_id,
          });

          // Log failed job
          await supabase.from('job_failures').insert({
            queue: 'email_notifications',
            message: msg.message,
            error: err.message,
            attempts: retryCount,
          });
        }
        // Otherwise, message will be visible again after VT expires

        failed++;
      }
    }

    return new Response(JSON.stringify({ processed, failed }));
  } catch (error) {
    console.error('Queue processing error:', error);
    return new Response(JSON.stringify({ error: error.message }), {
      status: 500,
    });
  }
});

async function processEmailMessage(message: any) {
  const { type } = message;

  switch (type) {
    case 'welcome_email':
      await sendWelcomeEmail(message);
      break;
    case 'task_assigned':
      await sendTaskAssignedEmail(message);
      break;
    default:
      console.warn('Unknown message type:', type);
  }
}

async function sendWelcomeEmail(data: any) {
  const response = await fetch('https://api.sendgrid.com/v3/mail/send', {
    method: 'POST',
    headers: {
      'Authorization': `Bearer ${Deno.env.get('SENDGRID_API_KEY')}`,
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      personalizations: [{ to: [{ email: data.email }] }],
      from: { email: 'noreply@example.com', name: 'TaskFlow' },
      subject: `Welcome to TaskFlow, ${data.name}!`,
      content: [{
        type: 'text/html',
        value: `<h1>Welcome ${data.name}!</h1><p>Thanks for joining...</p>`,
      }],
    }),
  });

  if (!response.ok) {
    throw new Error(`SendGrid error: ${response.status}`);
  }
}

async function sendTaskAssignedEmail(data: any) {
  // Get assignee email
  const { data: assignee } = await supabase
    .from('profiles')
    .select('email, full_name')
    .eq('id', data.assignee_id)
    .single();

  if (!assignee) throw new Error('Assignee not found');

  // Send email...
}

Cron: pg_cron for Daily Reports

-- Enable pg_cron (Dashboard → Database → Extensions)
CREATE EXTENSION IF NOT EXISTS pg_cron;

-- Create job: Daily stats aggregation at 1 AM UTC
SELECT cron.schedule(
  'daily-stats-aggregation',
  '0 1 * * *',
  $$
  INSERT INTO daily_stats (date, total_tasks, completed_tasks, new_users)
  SELECT
    CURRENT_DATE - 1,
    COUNT(*) FILTER (WHERE DATE(created_at) = CURRENT_DATE - 1),
    COUNT(*) FILTER (WHERE DATE(completed_at) = CURRENT_DATE - 1),
    (SELECT COUNT(*) FROM auth.users WHERE DATE(created_at) = CURRENT_DATE - 1)
  FROM tasks;
  $$
);

-- Create job: Weekly cleanup at Sunday 2 AM
SELECT cron.schedule(
  'weekly-cleanup',
  '0 2 * * 0',
  $$
  -- Delete old audit logs
  DELETE FROM audit_log WHERE created_at < NOW() - INTERVAL '90 days';

  -- Delete archived messages from pgmq
  DELETE FROM pgmq.a_email_notifications WHERE archived_at < NOW() - INTERVAL '30 days';

  -- Vacuum tables
  VACUUM ANALYZE tasks;
  VACUUM ANALYZE audit_log;
  $$
);

-- View scheduled jobs
SELECT * FROM cron.job;

-- View job run history
SELECT * FROM cron.job_run_details ORDER BY start_time DESC LIMIT 10;

-- Unschedule a job
SELECT cron.unschedule('daily-stats-aggregation');

Implementation: Cloudflare Approach

Queue Setup

# wrangler.toml
name = "job-processor"
main = "src/index.ts"

[[queues.producers]]
queue = "email-notifications"
binding = "EMAIL_QUEUE"

[[queues.consumers]]
queue = "email-notifications"
max_batch_size = 10
max_retries = 3
dead_letter_queue = "email-dlq"

[[queues.producers]]
queue = "email-dlq"
binding = "EMAIL_DLQ"

Producer Worker

// src/producer.ts
import { Hono } from 'hono';
import { createClient } from '@supabase/supabase-js';

interface Env {
  EMAIL_QUEUE: Queue;
  SUPABASE_URL: string;
  SUPABASE_SERVICE_KEY: string;
}

const app = new Hono<{ Bindings: Env }>();

// Webhook endpoint for Supabase database webhooks
app.post('/webhook/user-signup', async (c) => {
  const { record } = await c.req.json();

  await c.env.EMAIL_QUEUE.send({
    type: 'welcome_email',
    userId: record.id,
    email: record.email,
    name: record.raw_user_meta_data?.full_name,
    timestamp: Date.now(),
  });

  return c.json({ success: true });
});

// API endpoint for manual enqueue
app.post('/api/notify', async (c) => {
  const body = await c.req.json();

  await c.env.EMAIL_QUEUE.send({
    type: body.type,
    data: body.data,
    timestamp: Date.now(),
  });

  return c.json({ queued: true });
});

export default app;

Consumer Worker

// src/consumer.ts
interface Env {
  SUPABASE_URL: string;
  SUPABASE_SERVICE_KEY: string;
  SENDGRID_API_KEY: string;
  EMAIL_DLQ: Queue;
}

interface EmailMessage {
  type: string;
  userId?: string;
  email?: string;
  name?: string;
  data?: any;
  timestamp: number;
}

export default {
  async queue(
    batch: MessageBatch<EmailMessage>,
    env: Env
  ): Promise<void> {
    const supabase = createClient(env.SUPABASE_URL, env.SUPABASE_SERVICE_KEY);

    for (const message of batch.messages) {
      try {
        await processMessage(message.body, env);
        message.ack();
      } catch (error) {
        console.error('Failed to process message:', error);

        // Retry will happen automatically (up to max_retries)
        // After that, message goes to DLQ
        message.retry();
      }
    }
  },
};

async function processMessage(msg: EmailMessage, env: Env) {
  switch (msg.type) {
    case 'welcome_email':
      await sendWelcomeEmail(msg, env);
      break;
    case 'task_assigned':
      await sendTaskAssignedEmail(msg, env);
      break;
    default:
      console.warn('Unknown message type:', msg.type);
  }
}

async function sendWelcomeEmail(msg: EmailMessage, env: Env) {
  const response = await fetch('https://api.sendgrid.com/v3/mail/send', {
    method: 'POST',
    headers: {
      'Authorization': `Bearer ${env.SENDGRID_API_KEY}`,
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      personalizations: [{ to: [{ email: msg.email }] }],
      from: { email: 'noreply@example.com', name: 'TaskFlow' },
      subject: `Welcome to TaskFlow, ${msg.name}!`,
      content: [{
        type: 'text/html',
        value: `<h1>Welcome ${msg.name}!</h1>`,
      }],
    }),
  });

  if (!response.ok) {
    throw new Error(`SendGrid error: ${response.status}`);
  }
}

Cron Trigger Worker

// src/cron.ts
interface Env {
  SUPABASE_URL: string;
  SUPABASE_SERVICE_KEY: string;
  SLACK_WEBHOOK_URL: string;
}

export default {
  async scheduled(
    controller: ScheduledController,
    env: Env,
    ctx: ExecutionContext
  ): Promise<void> {
    const cron = controller.cron;

    switch (cron) {
      case '0 9 * * *': // Daily at 9 AM UTC
        await generateDailyReport(env);
        break;

      case '0 * * * *': // Every hour
        await checkSystemHealth(env);
        break;

      case '0 0 * * 1': // Weekly on Monday at midnight
        await sendWeeklySummary(env);
        break;
    }
  },
};

async function generateDailyReport(env: Env) {
  const supabase = createClient(env.SUPABASE_URL, env.SUPABASE_SERVICE_KEY);

  // Get yesterday's stats
  const yesterday = new Date();
  yesterday.setDate(yesterday.getDate() - 1);
  const dateStr = yesterday.toISOString().split('T')[0];

  const [
    { count: newTasks },
    { count: completedTasks },
    { count: newUsers },
  ] = await Promise.all([
    supabase
      .from('tasks')
      .select('*', { count: 'exact', head: true })
      .gte('created_at', `${dateStr}T00:00:00`)
      .lt('created_at', `${dateStr}T23:59:59`),
    supabase
      .from('tasks')
      .select('*', { count: 'exact', head: true })
      .eq('status', 'completed')
      .gte('completed_at', `${dateStr}T00:00:00`)
      .lt('completed_at', `${dateStr}T23:59:59`),
    supabase
      .from('profiles')
      .select('*', { count: 'exact', head: true })
      .gte('created_at', `${dateStr}T00:00:00`)
      .lt('created_at', `${dateStr}T23:59:59`),
  ]);

  // Save to database
  await supabase.from('daily_stats').insert({
    date: dateStr,
    new_tasks: newTasks,
    completed_tasks: completedTasks,
    new_users: newUsers,
  });

  // Send Slack notification
  await fetch(env.SLACK_WEBHOOK_URL, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      text: `📊 Daily Report (${dateStr})\n• New tasks: ${newTasks}\n• Completed: ${completedTasks}\n• New users: ${newUsers}`,
    }),
  });
}

async function checkSystemHealth(env: Env) {
  const supabase = createClient(env.SUPABASE_URL, env.SUPABASE_SERVICE_KEY);

  // Check database connectivity
  const { error } = await supabase.from('profiles').select('id').limit(1);

  if (error) {
    // Send alert
    await fetch(env.SLACK_WEBHOOK_URL, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        text: `🚨 ALERT: Database connectivity issue!\nError: ${error.message}`,
      }),
    });
  }
}
# wrangler.toml - Cron configuration
[triggers]
crons = [
  "0 9 * * *",   # Daily at 9 AM UTC
  "0 * * * *",   # Every hour
  "0 0 * * 1"    # Weekly on Monday
]

Decision Framework

Chọn Queue nào?

┌─────────────────────────────────────────────────────────────┐
│                    QUEUE DECISION TREE                       │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌──────────────────────────────────────────────────────┐   │
│  │ Need transaction with database?                       │   │
│  │   YES ──▶ Supabase pgmq                              │   │
│  │   NO ──▶ Continue                                    │   │
│  └──────────────────────────────────────────────────────┘   │
│                          │                                   │
│  ┌──────────────────────▼───────────────────────────────┐   │
│  │ High throughput (> 10K msg/min)?                     │   │
│  │   YES ──▶ Cloudflare Queues                          │   │
│  │   NO ──▶ Continue                                    │   │
│  └──────────────────────────────────────────────────────┘   │
│                          │                                   │
│  ┌──────────────────────▼───────────────────────────────┐   │
│  │ Need built-in retry/DLQ?                             │   │
│  │   YES ──▶ Cloudflare Queues                          │   │
│  │   NO ──▶ Supabase pgmq (simpler setup)              │   │
│  └──────────────────────────────────────────────────────┘   │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Chọn Cron nào?

┌─────────────────────────────────────────────────────────────┐
│                    CRON DECISION TREE                        │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌──────────────────────────────────────────────────────┐   │
│  │ Job involves only database operations?               │   │
│  │   YES ──▶ Supabase pg_cron (direct SQL)             │   │
│  │   NO ──▶ Continue                                    │   │
│  └──────────────────────────────────────────────────────┘   │
│                          │                                   │
│  ┌──────────────────────▼───────────────────────────────┐   │
│  │ Need to call external APIs?                          │   │
│  │   YES ──▶ Cloudflare Cron Triggers                   │   │
│  │   NO ──▶ Continue                                    │   │
│  └──────────────────────────────────────────────────────┘   │
│                          │                                   │
│  ┌──────────────────────▼───────────────────────────────┐   │
│  │ Job runs longer than 30 seconds?                     │   │
│  │   YES ──▶ Split into queue jobs                      │   │
│  │   NO ──▶ Either option works                         │   │
│  └──────────────────────────────────────────────────────┘   │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Monitoring

Supabase Monitoring

-- View pgmq queue stats
SELECT * FROM pgmq.metrics('email_notifications');

-- View recent job failures
SELECT * FROM job_failures
ORDER BY created_at DESC
LIMIT 20;

-- View pg_cron run history
SELECT
  jobid,
  runid,
  job_pid,
  database,
  command,
  status,
  start_time,
  end_time,
  return_message
FROM cron.job_run_details
ORDER BY start_time DESC
LIMIT 20;

Cloudflare Monitoring

// Workers Analytics available in Dashboard
// Or use Logpush for detailed logs

// In worker, log structured data
console.log(JSON.stringify({
  type: 'job_processed',
  queue: 'email-notifications',
  messageId: msg.id,
  duration: Date.now() - startTime,
}));

Kết quả

Implementation Complete

Feature Supabase Cloudflare
Email queue pgmq + Edge Function Queue + Worker
Daily reports pg_cron Cron Trigger
Retry handling Manual Built-in
Monitoring SQL queries Dashboard

Recommendations

Use Case Recommended
Email notifications Cloudflare Queues
Database cleanup Supabase pg_cron
External API calls Cloudflare Cron
Transactional jobs Supabase pgmq
High throughput Cloudflare Queues

Lessons Learned

Supabase Approach

Pros: - Single platform, simpler infra - ACID transactions with queue operations - No extra cost

Cons: - Manual retry implementation - Limited throughput - Consumer requires polling

Cloudflare Approach

Pros: - Built-in retry and DLQ - High throughput - Better monitoring

Cons: - Separate platform - Additional cost - No direct DB transactions


Tài liệu tham khảo