Bỏ qua

Best Practices & Patterns

Queue Best Practices

1. Idempotency

Đảm bảo message có thể xử lý nhiều lần mà không gây side effects.

// ❌ BAD: Not idempotent
async function processPayment(msg: Message) {
  await supabase.from('balance')
    .update({ amount: balance + msg.amount })  // Duplicate = wrong balance!
    .eq('user_id', msg.userId);
}

// ✅ GOOD: Idempotent với deduplication
async function processPayment(msg: Message) {
  // Check if already processed
  const { data: existing } = await supabase
    .from('transactions')
    .select('id')
    .eq('idempotency_key', msg.idempotencyKey)
    .single();

  if (existing) {
    console.log('Already processed, skipping');
    return;
  }

  // Process and record
  await supabase.from('transactions').insert({
    idempotency_key: msg.idempotencyKey,
    user_id: msg.userId,
    amount: msg.amount,
    processed_at: new Date(),
  });
}

2. Visibility Timeout

Set timeout phù hợp với processing time.

// ✅ GOOD: Match timeout to expected processing time
const PROCESSING_TIME = 30; // seconds
const BUFFER = 1.5; // 50% buffer
const VISIBILITY_TIMEOUT = Math.ceil(PROCESSING_TIME * BUFFER);

// For Supabase pgmq
const messages = await supabase.rpc('pgmq_read', {
  queue_name: 'my_queue',
  vt: VISIBILITY_TIMEOUT,  // 45 seconds
  qty: 1,
});

// For Cloudflare Queue (configured in wrangler.toml)
// [queues.consumers]
// max_batch_timeout = 30

3. Dead Letter Queue

Handle failed messages properly.

// ✅ GOOD: DLQ pattern for Supabase
async function processWithDLQ(msg: Message) {
  const MAX_RETRIES = 3;

  try {
    await processMessage(msg);
    await supabase.rpc('pgmq_archive', {
      queue_name: 'main_queue',
      msg_id: msg.msg_id,
    });
  } catch (error) {
    if (msg.read_ct >= MAX_RETRIES) {
      // Move to DLQ
      await supabase.rpc('pgmq_send', {
        queue_name: 'dead_letter_queue',
        message: {
          original_message: msg,
          error: error.message,
          failed_at: new Date().toISOString(),
        },
      });
      await supabase.rpc('pgmq_archive', {
        queue_name: 'main_queue',
        msg_id: msg.msg_id,
      });
    }
    // else: message will become visible again after timeout
  }
}

4. Batch Processing

Process multiple messages efficiently.

// ✅ GOOD: Batch processing in Cloudflare
export default {
  async queue(batch: MessageBatch<Email>) {
    // Group by type for efficient processing
    const byType = batch.messages.reduce((acc, msg) => {
      const type = msg.body.type;
      if (!acc[type]) acc[type] = [];
      acc[type].push(msg);
      return acc;
    }, {});

    // Process each type
    for (const [type, messages] of Object.entries(byType)) {
      try {
        await processBatch(type, messages);
        messages.forEach(m => m.ack());
      } catch (error) {
        // Retry all in batch
        messages.forEach(m => m.retry());
      }
    }
  }
};

5. Monitoring & Alerting

Track queue health.

-- Supabase: Create monitoring view
CREATE VIEW queue_health AS
SELECT
  queue_name,
  COUNT(*) as pending_messages,
  MIN(enqueued_at) as oldest_message,
  AVG(read_ct) as avg_read_count,
  MAX(read_ct) as max_read_count
FROM pgmq.q_email_queue  -- Replace with your queue
GROUP BY queue_name;

-- Alert if queue is backing up
CREATE OR REPLACE FUNCTION check_queue_health()
RETURNS void AS $$
BEGIN
  IF (SELECT pending_messages FROM queue_health) > 1000 THEN
    -- Send alert (via Edge Function or http extension)
    PERFORM http_post('https://alerts.example.com/webhook', ...);
  END IF;
END;
$$ LANGUAGE plpgsql;

Cron Best Practices

1. Distributed Locking

Prevent duplicate execution.

-- Supabase: Advisory lock pattern
CREATE OR REPLACE FUNCTION exclusive_job()
RETURNS void AS $$
BEGIN
  -- Try to acquire lock (non-blocking)
  IF NOT pg_try_advisory_lock(hashtext('my_job_name')) THEN
    RAISE NOTICE 'Job already running, skipping';
    RETURN;
  END IF;

  BEGIN
    -- Your job logic here
    PERFORM do_actual_work();
  EXCEPTION WHEN OTHERS THEN
    -- Release lock on error
    PERFORM pg_advisory_unlock(hashtext('my_job_name'));
    RAISE;
  END;

  -- Release lock when done
  PERFORM pg_advisory_unlock(hashtext('my_job_name'));
END;
$$ LANGUAGE plpgsql;
// Cloudflare: Use KV for locking
export default {
  async scheduled(event: ScheduledEvent, env: Env) {
    const lockKey = `lock:${event.cron}`;
    const lockValue = Date.now().toString();

    // Try to acquire lock
    const existing = await env.KV.get(lockKey);
    if (existing) {
      console.log('Job already running');
      return;
    }

    // Set lock with TTL
    await env.KV.put(lockKey, lockValue, { expirationTtl: 300 });

    try {
      await doWork();
    } finally {
      // Release lock
      await env.KV.delete(lockKey);
    }
  }
};

2. Timeout Handling

Handle long-running jobs properly.

-- Supabase: Set statement timeout
CREATE OR REPLACE FUNCTION long_running_job()
RETURNS void AS $$
BEGIN
  -- Set timeout for this session
  SET LOCAL statement_timeout = '5min';

  -- Your long operation
  PERFORM heavy_processing();

EXCEPTION WHEN query_canceled THEN
  -- Log timeout
  INSERT INTO job_logs (job_name, status, message)
  VALUES ('long_running_job', 'timeout', 'Job exceeded 5 minute limit');
END;
$$ LANGUAGE plpgsql;
// Cloudflare: Handle timeout gracefully
export default {
  async scheduled(event: ScheduledEvent, env: Env) {
    const startTime = Date.now();
    const MAX_RUNTIME = 25000; // 25 seconds (leave buffer)

    const items = await getItemsToProcess();

    for (const item of items) {
      if (Date.now() - startTime > MAX_RUNTIME) {
        console.log('Approaching timeout, stopping early');
        // Save progress for next run
        await saveProgress(item.id);
        break;
      }

      await processItem(item);
    }
  }
};

3. Job Logging

Track execution history.

-- Supabase: Comprehensive job logging
CREATE TABLE job_logs (
  id SERIAL PRIMARY KEY,
  job_name TEXT NOT NULL,
  started_at TIMESTAMPTZ DEFAULT NOW(),
  completed_at TIMESTAMPTZ,
  status TEXT NOT NULL DEFAULT 'running',
  rows_affected INTEGER,
  error_message TEXT,
  duration_ms INTEGER GENERATED ALWAYS AS (
    EXTRACT(MILLISECONDS FROM (completed_at - started_at))
  ) STORED
);

CREATE OR REPLACE FUNCTION logged_job()
RETURNS void AS $$
DECLARE
  log_id INTEGER;
  affected INTEGER;
BEGIN
  -- Start log
  INSERT INTO job_logs (job_name, status)
  VALUES ('my_job', 'running')
  RETURNING id INTO log_id;

  -- Do work
  DELETE FROM old_data WHERE created_at < NOW() - INTERVAL '30 days';
  GET DIAGNOSTICS affected = ROW_COUNT;

  -- Complete log
  UPDATE job_logs SET
    status = 'completed',
    completed_at = NOW(),
    rows_affected = affected
  WHERE id = log_id;

EXCEPTION WHEN OTHERS THEN
  UPDATE job_logs SET
    status = 'failed',
    completed_at = NOW(),
    error_message = SQLERRM
  WHERE id = log_id;
  RAISE;
END;
$$ LANGUAGE plpgsql;

4. Avoid Long-running Cron Jobs

Split into Queue jobs for heavy work.

// ❌ BAD: Long cron job
export default {
  async scheduled() {
    const users = await getAllUsers(); // 100,000 users
    for (const user of users) {
      await sendEmail(user); // This will timeout!
    }
  }
};

// ✅ GOOD: Cron triggers queue
export default {
  async scheduled(event: ScheduledEvent, env: Env) {
    const users = await getUsersBatch(1000); // Get batch

    // Enqueue for processing
    await env.EMAIL_QUEUE.sendBatch(
      users.map(u => ({
        body: { userId: u.id, type: 'digest' }
      }))
    );
  }
};

Common Patterns

Pattern 1: Saga / Choreography

Use queues for multi-step workflows.

┌──────────┐     ┌──────────┐     ┌──────────┐
│  Order   │────▶│ Payment  │────▶│ Shipping │
│  Queue   │     │  Queue   │     │  Queue   │
└──────────┘     └──────────┘     └──────────┘
     │                │                │
     ▼                ▼                ▼
 Create order    Charge card     Ship items
     │                │                │
     └────────────────┴────────────────┘
                Compensation on failure

Pattern 2: Fan-out

One message triggers multiple processes.

// Producer: Single event
await queue.send({
  type: 'user.created',
  userId: newUser.id,
});

// Consumers: Multiple handlers
const handlers = {
  'user.created': [
    sendWelcomeEmail,
    createDefaultSettings,
    notifyAdmins,
    trackAnalytics,
  ],
};

export default {
  async queue(batch: MessageBatch) {
    for (const msg of batch.messages) {
      const fns = handlers[msg.body.type] || [];
      await Promise.all(fns.map(fn => fn(msg.body)));
      msg.ack();
    }
  }
};

Pattern 3: Scheduled Queue Processing

Cron triggers batch queue processing.

-- pg_cron: Schedule batch processing
SELECT cron.schedule(
  'process-pending-exports',
  '*/15 * * * *',  -- Every 15 minutes
  $$
    -- Move pending to queue
    WITH pending AS (
      UPDATE exports
      SET status = 'queued'
      WHERE status = 'pending'
      AND created_at < NOW() - INTERVAL '1 minute'
      RETURNING *
    )
    SELECT pgmq.send('export_queue', to_jsonb(pending))
    FROM pending;
  $$
);

Anti-patterns

❌ Fire-and-forget without confirmation

// ❌ BAD
await queue.send(message);
return Response.json({ status: 'sent' });
// What if queue.send failed?

// ✅ GOOD
try {
  const msgId = await queue.send(message);
  return Response.json({ status: 'queued', msgId });
} catch (error) {
  return Response.json({ status: 'failed' }, { status: 500 });
}

❌ No error handling in consumers

// ❌ BAD
async queue(batch: MessageBatch) {
  for (const msg of batch.messages) {
    await process(msg);  // If this throws, message lost!
    msg.ack();
  }
}

// ✅ GOOD
async queue(batch: MessageBatch) {
  for (const msg of batch.messages) {
    try {
      await process(msg);
      msg.ack();
    } catch (error) {
      console.error('Failed:', error);
      msg.retry();
    }
  }
}

❌ Polling instead of Queue

// ❌ BAD: Polling every minute
async scheduled() {
  const pending = await db.from('tasks').select('*').eq('status', 'pending');
  for (const task of pending) {
    await process(task);
  }
}

// ✅ GOOD: Trigger-based queue
// On INSERT to tasks
await queue.send({ taskId: newTask.id });

Tổng kết

Queue Best Practices

  1. Idempotency - Safe to retry
  2. Visibility timeout - Match processing time
  3. Dead letter queue - Handle failures
  4. Batch processing - Efficiency
  5. Monitoring - Track health

Cron Best Practices

  1. Distributed locking - Prevent duplicates
  2. Timeout handling - Graceful stops
  3. Job logging - Track execution
  4. Split heavy work - Use queues
  5. Meaningful names - Easy debugging

Golden Rules

  • Always handle errors explicitly
  • Log everything important
  • Monitor queue depth
  • Test failure scenarios

Q&A

  1. Patterns nào đang dùng trong dự án?
  2. Có anti-patterns nào đang mắc phải?
  3. Monitoring hiện tại như thế nào?