Bỏ qua

Phase 5: Queue Processing

Mục tiêu

Implement background job processing với Supabase pgmq và Cloudflare Queues.

Thời gian ước tính: 3 giờ


Overview: Chọn Queue nào?

Use Case Supabase pgmq Cloudflare Queue
Email notifications ✅ Recommended
Activity logging ✅ Recommended
DB-heavy operations
High throughput

Trong phase này, chúng ta sẽ implement cả hai để so sánh.


Part A: Supabase pgmq (Database Queue)

Step 1: Enable pgmq Extension

-- Enable pgmq in Supabase Dashboard or via SQL
CREATE EXTENSION IF NOT EXISTS pgmq;

Step 2: Create Queues

-- supabase/migrations/20240101000010_create_queues.sql

-- Create activity log queue
SELECT pgmq.create('activity_log');

-- Create notification queue
SELECT pgmq.create('notifications');

-- Activity log table (for persistence)
CREATE TABLE public.activity_logs (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  entity_type TEXT NOT NULL,
  entity_id UUID NOT NULL,
  action TEXT NOT NULL,
  actor_id UUID REFERENCES auth.users(id),
  metadata JSONB DEFAULT '{}',
  created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_activity_entity ON activity_logs(entity_type, entity_id);
CREATE INDEX idx_activity_actor ON activity_logs(actor_id);

Step 3: Producer - Trigger on Task Changes

-- supabase/migrations/20240101000011_activity_trigger.sql

CREATE OR REPLACE FUNCTION enqueue_activity_log()
RETURNS TRIGGER AS $$
DECLARE
  action_type TEXT;
  task_data JSONB;
BEGIN
  -- Determine action
  IF TG_OP = 'INSERT' THEN
    action_type := 'created';
    task_data := to_jsonb(NEW);
  ELSIF TG_OP = 'UPDATE' THEN
    action_type := 'updated';
    task_data := jsonb_build_object(
      'old', to_jsonb(OLD),
      'new', to_jsonb(NEW),
      'changes', (
        SELECT jsonb_object_agg(key, value)
        FROM jsonb_each(to_jsonb(NEW))
        WHERE to_jsonb(NEW) -> key != to_jsonb(OLD) -> key
      )
    );
  ELSIF TG_OP = 'DELETE' THEN
    action_type := 'deleted';
    task_data := to_jsonb(OLD);
  END IF;

  -- Enqueue activity
  PERFORM pgmq.send(
    'activity_log',
    jsonb_build_object(
      'entity_type', 'task',
      'entity_id', COALESCE(NEW.id, OLD.id),
      'action', action_type,
      'actor_id', auth.uid(),
      'data', task_data,
      'timestamp', NOW()
    )
  );

  -- Also enqueue notification for assignment changes
  IF TG_OP = 'UPDATE'
     AND OLD.assignee_id IS DISTINCT FROM NEW.assignee_id
     AND NEW.assignee_id IS NOT NULL THEN
    PERFORM pgmq.send(
      'notifications',
      jsonb_build_object(
        'type', 'task_assigned',
        'task_id', NEW.id,
        'task_title', NEW.title,
        'assignee_id', NEW.assignee_id,
        'assigner_id', auth.uid()
      )
    );
  END IF;

  RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

CREATE TRIGGER on_task_change
  AFTER INSERT OR UPDATE OR DELETE ON tasks
  FOR EACH ROW
  EXECUTE FUNCTION enqueue_activity_log();

Step 4: Consumer - Edge Function

// supabase/functions/process-activity-queue/index.ts
import { serve } from 'https://deno.land/std@0.168.0/http/server.ts';
import { createClient } from 'https://esm.sh/@supabase/supabase-js@2';

const supabase = createClient(
  Deno.env.get('SUPABASE_URL')!,
  Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!
);

serve(async (req) => {
  try {
    // Read messages from queue
    const { data: messages, error: readError } = await supabase.rpc(
      'pgmq_read',
      {
        queue_name: 'activity_log',
        vt: 60, // 60 second visibility timeout
        qty: 10, // Process 10 at a time
      }
    );

    if (readError) throw readError;

    if (!messages || messages.length === 0) {
      return new Response(JSON.stringify({ processed: 0 }));
    }

    let processed = 0;

    for (const msg of messages) {
      try {
        const activity = msg.message;

        // Insert into activity_logs table
        const { error: insertError } = await supabase
          .from('activity_logs')
          .insert({
            entity_type: activity.entity_type,
            entity_id: activity.entity_id,
            action: activity.action,
            actor_id: activity.actor_id,
            metadata: activity.data,
          });

        if (insertError) throw insertError;

        // Delete processed message
        await supabase.rpc('pgmq_delete', {
          queue_name: 'activity_log',
          msg_id: msg.msg_id,
        });

        processed++;
      } catch (err) {
        console.error('Failed to process message:', msg.msg_id, err);
        // Message will be visible again after visibility timeout
      }
    }

    return new Response(JSON.stringify({ processed }));
  } catch (error) {
    console.error('Queue processing error:', error);
    return new Response(JSON.stringify({ error: error.message }), {
      status: 500,
    });
  }
});

Step 5: Schedule Consumer (pg_cron)

-- Run consumer every minute
SELECT cron.schedule(
  'process-activity-queue',
  '* * * * *',
  $$
  SELECT net.http_post(
    url := 'https://YOUR_PROJECT.supabase.co/functions/v1/process-activity-queue',
    headers := '{"Authorization": "Bearer YOUR_SERVICE_ROLE_KEY"}'::jsonb
  );
  $$
);

Part B: Cloudflare Queues

Step 1: Create Queue

# Create queue
wrangler queues create taskflow-notifications

# Create dead letter queue
wrangler queues create taskflow-notifications-dlq

Step 2: Configure wrangler.toml

# wrangler.toml
name = "taskflow"

# Queue producer binding
[[queues.producers]]
queue = "taskflow-notifications"
binding = "NOTIFICATION_QUEUE"

# Queue consumer
[[queues.consumers]]
queue = "taskflow-notifications"
max_batch_size = 10
max_retries = 3
dead_letter_queue = "taskflow-notifications-dlq"

Step 3: Producer - API Route

// src/app/api/tasks/[id]/assign/route.ts
import { createClient } from '@/lib/supabase/server';
import { NextResponse } from 'next/server';

export async function POST(
  request: Request,
  { params }: { params: { id: string } }
) {
  const supabase = await createClient();
  const { assignee_id } = await request.json();

  const { data: { user } } = await supabase.auth.getUser();

  // Update task
  const { data: task, error } = await supabase
    .from('tasks')
    .update({ assignee_id })
    .eq('id', params.id)
    .select('id, title')
    .single();

  if (error) {
    return NextResponse.json({ error: error.message }, { status: 400 });
  }

  // Enqueue notification via Cloudflare Queue
  // This is done through a separate worker or API call
  await fetch(`${process.env.WORKER_URL}/api/queue/notification`, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      type: 'task_assigned',
      taskId: task.id,
      taskTitle: task.title,
      assigneeId: assignee_id,
      assignerId: user!.id,
    }),
  });

  return NextResponse.json({ task });
}

Step 4: Queue Worker (Producer endpoint)

// workers/queue-api/src/index.ts
import { Hono } from 'hono';

interface Env {
  NOTIFICATION_QUEUE: Queue;
}

const app = new Hono<{ Bindings: Env }>();

app.post('/api/queue/notification', async (c) => {
  const body = await c.req.json();

  await c.env.NOTIFICATION_QUEUE.send({
    ...body,
    timestamp: Date.now(),
  });

  return c.json({ queued: true });
});

export default app;

Step 5: Queue Consumer

// workers/notification-consumer/src/index.ts
import { createClient } from '@supabase/supabase-js';

interface Env {
  SUPABASE_URL: string;
  SUPABASE_SERVICE_KEY: string;
  SENDGRID_API_KEY: string;
}

interface NotificationMessage {
  type: string;
  taskId: string;
  taskTitle: string;
  assigneeId: string;
  assignerId: string;
  timestamp: number;
}

export default {
  async queue(
    batch: MessageBatch<NotificationMessage>,
    env: Env
  ): Promise<void> {
    const supabase = createClient(env.SUPABASE_URL, env.SUPABASE_SERVICE_KEY);

    for (const message of batch.messages) {
      try {
        const notification = message.body;

        if (notification.type === 'task_assigned') {
          await handleTaskAssigned(notification, supabase, env);
        }

        message.ack();
      } catch (error) {
        console.error('Failed to process notification:', error);
        message.retry();
      }
    }
  },
};

async function handleTaskAssigned(
  notification: NotificationMessage,
  supabase: any,
  env: Env
) {
  // Get assignee email
  const { data: assignee } = await supabase
    .from('profiles')
    .select('email, full_name')
    .eq('id', notification.assigneeId)
    .single();

  if (!assignee) return;

  // Get assigner name
  const { data: assigner } = await supabase
    .from('profiles')
    .select('full_name')
    .eq('id', notification.assignerId)
    .single();

  // Send email via SendGrid
  const response = await fetch('https://api.sendgrid.com/v3/mail/send', {
    method: 'POST',
    headers: {
      'Authorization': `Bearer ${env.SENDGRID_API_KEY}`,
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      personalizations: [
        { to: [{ email: assignee.email, name: assignee.full_name }] }
      ],
      from: { email: 'noreply@taskflow.app', name: 'TaskFlow' },
      subject: `Task assigned: ${notification.taskTitle}`,
      content: [
        {
          type: 'text/html',
          value: `
            <h2>You've been assigned a task</h2>
            <p><strong>${notification.taskTitle}</strong></p>
            <p>Assigned by: ${assigner?.full_name || 'Someone'}</p>
            <a href="https://taskflow.app/tasks/${notification.taskId}">View Task</a>
          `,
        },
      ],
    }),
  });

  if (!response.ok) {
    throw new Error(`SendGrid error: ${response.status}`);
  }

  // Also save to notifications table (in-app notification)
  await supabase.from('notifications').insert({
    user_id: notification.assigneeId,
    type: 'task_assigned',
    title: 'New task assigned',
    message: `You've been assigned "${notification.taskTitle}"`,
    data: {
      task_id: notification.taskId,
      assigner_id: notification.assignerId,
    },
  });
}

Step 6: Deploy Consumer

# Deploy queue consumer
cd workers/notification-consumer
wrangler deploy

Comparison Summary

When to use Supabase pgmq:

✅ Activity logging (needs database transaction)
✅ Complex queries before/after processing
✅ Data needs to stay in database
✅ Simple setup (no extra service)

❌ High throughput needed
❌ Complex retry logic

When to use Cloudflare Queues:

✅ Email/SMS notifications (external API)
✅ High throughput processing
✅ Built-in retry with DLQ
✅ Processing at edge

❌ Need database transaction
❌ Simple use case (overkill)

Verification Checklist

  • [ ] pgmq extension enabled
  • [ ] Activity queue created and working
  • [ ] Task changes trigger activity logging
  • [ ] Cloudflare Queue created
  • [ ] Notification producer working
  • [ ] Notification consumer processing messages
  • [ ] Email notifications sending (or simulated)

Common Issues

pgmq: Function not found

Error: function pgmq_read does not exist

Solution: Ensure pgmq extension is enabled and functions are exposed via RPC.

Cloudflare Queue: Message not acknowledged

Messages keep reappearing because message.ack() was not called.

Solution: Always call message.ack() after successful processing.


Next Phase

Chuyển sang Phase 6: Scheduled Jobs để implement cron jobs.