Best Practices & Patterns¶
Queue Best Practices¶
1. Idempotency¶
Đảm bảo message có thể xử lý nhiều lần mà không gây side effects.
// ❌ BAD: Not idempotent
async function processPayment(msg: Message) {
await supabase.from('balance')
.update({ amount: balance + msg.amount }) // Duplicate = wrong balance!
.eq('user_id', msg.userId);
}
// ✅ GOOD: Idempotent với deduplication
async function processPayment(msg: Message) {
// Check if already processed
const { data: existing } = await supabase
.from('transactions')
.select('id')
.eq('idempotency_key', msg.idempotencyKey)
.single();
if (existing) {
console.log('Already processed, skipping');
return;
}
// Process and record
await supabase.from('transactions').insert({
idempotency_key: msg.idempotencyKey,
user_id: msg.userId,
amount: msg.amount,
processed_at: new Date(),
});
}
2. Visibility Timeout¶
Set timeout phù hợp với processing time.
// ✅ GOOD: Match timeout to expected processing time
const PROCESSING_TIME = 30; // seconds
const BUFFER = 1.5; // 50% buffer
const VISIBILITY_TIMEOUT = Math.ceil(PROCESSING_TIME * BUFFER);
// For Supabase pgmq
const messages = await supabase.rpc('pgmq_read', {
queue_name: 'my_queue',
vt: VISIBILITY_TIMEOUT, // 45 seconds
qty: 1,
});
// For Cloudflare Queue (configured in wrangler.toml)
// [queues.consumers]
// max_batch_timeout = 30
3. Dead Letter Queue¶
Handle failed messages properly.
// ✅ GOOD: DLQ pattern for Supabase
async function processWithDLQ(msg: Message) {
const MAX_RETRIES = 3;
try {
await processMessage(msg);
await supabase.rpc('pgmq_archive', {
queue_name: 'main_queue',
msg_id: msg.msg_id,
});
} catch (error) {
if (msg.read_ct >= MAX_RETRIES) {
// Move to DLQ
await supabase.rpc('pgmq_send', {
queue_name: 'dead_letter_queue',
message: {
original_message: msg,
error: error.message,
failed_at: new Date().toISOString(),
},
});
await supabase.rpc('pgmq_archive', {
queue_name: 'main_queue',
msg_id: msg.msg_id,
});
}
// else: message will become visible again after timeout
}
}
4. Batch Processing¶
Process multiple messages efficiently.
// ✅ GOOD: Batch processing in Cloudflare
export default {
async queue(batch: MessageBatch<Email>) {
// Group by type for efficient processing
const byType = batch.messages.reduce((acc, msg) => {
const type = msg.body.type;
if (!acc[type]) acc[type] = [];
acc[type].push(msg);
return acc;
}, {});
// Process each type
for (const [type, messages] of Object.entries(byType)) {
try {
await processBatch(type, messages);
messages.forEach(m => m.ack());
} catch (error) {
// Retry all in batch
messages.forEach(m => m.retry());
}
}
}
};
5. Monitoring & Alerting¶
Track queue health.
-- Supabase: Create monitoring view
CREATE VIEW queue_health AS
SELECT
queue_name,
COUNT(*) as pending_messages,
MIN(enqueued_at) as oldest_message,
AVG(read_ct) as avg_read_count,
MAX(read_ct) as max_read_count
FROM pgmq.q_email_queue -- Replace with your queue
GROUP BY queue_name;
-- Alert if queue is backing up
CREATE OR REPLACE FUNCTION check_queue_health()
RETURNS void AS $$
BEGIN
IF (SELECT pending_messages FROM queue_health) > 1000 THEN
-- Send alert (via Edge Function or http extension)
PERFORM http_post('https://alerts.example.com/webhook', ...);
END IF;
END;
$$ LANGUAGE plpgsql;
Cron Best Practices¶
1. Distributed Locking¶
Prevent duplicate execution.
-- Supabase: Advisory lock pattern
CREATE OR REPLACE FUNCTION exclusive_job()
RETURNS void AS $$
BEGIN
-- Try to acquire lock (non-blocking)
IF NOT pg_try_advisory_lock(hashtext('my_job_name')) THEN
RAISE NOTICE 'Job already running, skipping';
RETURN;
END IF;
BEGIN
-- Your job logic here
PERFORM do_actual_work();
EXCEPTION WHEN OTHERS THEN
-- Release lock on error
PERFORM pg_advisory_unlock(hashtext('my_job_name'));
RAISE;
END;
-- Release lock when done
PERFORM pg_advisory_unlock(hashtext('my_job_name'));
END;
$$ LANGUAGE plpgsql;
// Cloudflare: Use KV for locking
export default {
async scheduled(event: ScheduledEvent, env: Env) {
const lockKey = `lock:${event.cron}`;
const lockValue = Date.now().toString();
// Try to acquire lock
const existing = await env.KV.get(lockKey);
if (existing) {
console.log('Job already running');
return;
}
// Set lock with TTL
await env.KV.put(lockKey, lockValue, { expirationTtl: 300 });
try {
await doWork();
} finally {
// Release lock
await env.KV.delete(lockKey);
}
}
};
2. Timeout Handling¶
Handle long-running jobs properly.
-- Supabase: Set statement timeout
CREATE OR REPLACE FUNCTION long_running_job()
RETURNS void AS $$
BEGIN
-- Set timeout for this session
SET LOCAL statement_timeout = '5min';
-- Your long operation
PERFORM heavy_processing();
EXCEPTION WHEN query_canceled THEN
-- Log timeout
INSERT INTO job_logs (job_name, status, message)
VALUES ('long_running_job', 'timeout', 'Job exceeded 5 minute limit');
END;
$$ LANGUAGE plpgsql;
// Cloudflare: Handle timeout gracefully
export default {
async scheduled(event: ScheduledEvent, env: Env) {
const startTime = Date.now();
const MAX_RUNTIME = 25000; // 25 seconds (leave buffer)
const items = await getItemsToProcess();
for (const item of items) {
if (Date.now() - startTime > MAX_RUNTIME) {
console.log('Approaching timeout, stopping early');
// Save progress for next run
await saveProgress(item.id);
break;
}
await processItem(item);
}
}
};
3. Job Logging¶
Track execution history.
-- Supabase: Comprehensive job logging
CREATE TABLE job_logs (
id SERIAL PRIMARY KEY,
job_name TEXT NOT NULL,
started_at TIMESTAMPTZ DEFAULT NOW(),
completed_at TIMESTAMPTZ,
status TEXT NOT NULL DEFAULT 'running',
rows_affected INTEGER,
error_message TEXT,
duration_ms INTEGER GENERATED ALWAYS AS (
EXTRACT(MILLISECONDS FROM (completed_at - started_at))
) STORED
);
CREATE OR REPLACE FUNCTION logged_job()
RETURNS void AS $$
DECLARE
log_id INTEGER;
affected INTEGER;
BEGIN
-- Start log
INSERT INTO job_logs (job_name, status)
VALUES ('my_job', 'running')
RETURNING id INTO log_id;
-- Do work
DELETE FROM old_data WHERE created_at < NOW() - INTERVAL '30 days';
GET DIAGNOSTICS affected = ROW_COUNT;
-- Complete log
UPDATE job_logs SET
status = 'completed',
completed_at = NOW(),
rows_affected = affected
WHERE id = log_id;
EXCEPTION WHEN OTHERS THEN
UPDATE job_logs SET
status = 'failed',
completed_at = NOW(),
error_message = SQLERRM
WHERE id = log_id;
RAISE;
END;
$$ LANGUAGE plpgsql;
4. Avoid Long-running Cron Jobs¶
Split into Queue jobs for heavy work.
// ❌ BAD: Long cron job
export default {
async scheduled() {
const users = await getAllUsers(); // 100,000 users
for (const user of users) {
await sendEmail(user); // This will timeout!
}
}
};
// ✅ GOOD: Cron triggers queue
export default {
async scheduled(event: ScheduledEvent, env: Env) {
const users = await getUsersBatch(1000); // Get batch
// Enqueue for processing
await env.EMAIL_QUEUE.sendBatch(
users.map(u => ({
body: { userId: u.id, type: 'digest' }
}))
);
}
};
Common Patterns¶
Pattern 1: Saga / Choreography¶
Use queues for multi-step workflows.
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Order │────▶│ Payment │────▶│ Shipping │
│ Queue │ │ Queue │ │ Queue │
└──────────┘ └──────────┘ └──────────┘
│ │ │
▼ ▼ ▼
Create order Charge card Ship items
│ │ │
└────────────────┴────────────────┘
│
Compensation on failure
Pattern 2: Fan-out¶
One message triggers multiple processes.
// Producer: Single event
await queue.send({
type: 'user.created',
userId: newUser.id,
});
// Consumers: Multiple handlers
const handlers = {
'user.created': [
sendWelcomeEmail,
createDefaultSettings,
notifyAdmins,
trackAnalytics,
],
};
export default {
async queue(batch: MessageBatch) {
for (const msg of batch.messages) {
const fns = handlers[msg.body.type] || [];
await Promise.all(fns.map(fn => fn(msg.body)));
msg.ack();
}
}
};
Pattern 3: Scheduled Queue Processing¶
Cron triggers batch queue processing.
-- pg_cron: Schedule batch processing
SELECT cron.schedule(
'process-pending-exports',
'*/15 * * * *', -- Every 15 minutes
$$
-- Move pending to queue
WITH pending AS (
UPDATE exports
SET status = 'queued'
WHERE status = 'pending'
AND created_at < NOW() - INTERVAL '1 minute'
RETURNING *
)
SELECT pgmq.send('export_queue', to_jsonb(pending))
FROM pending;
$$
);
Anti-patterns¶
❌ Fire-and-forget without confirmation¶
// ❌ BAD
await queue.send(message);
return Response.json({ status: 'sent' });
// What if queue.send failed?
// ✅ GOOD
try {
const msgId = await queue.send(message);
return Response.json({ status: 'queued', msgId });
} catch (error) {
return Response.json({ status: 'failed' }, { status: 500 });
}
❌ No error handling in consumers¶
// ❌ BAD
async queue(batch: MessageBatch) {
for (const msg of batch.messages) {
await process(msg); // If this throws, message lost!
msg.ack();
}
}
// ✅ GOOD
async queue(batch: MessageBatch) {
for (const msg of batch.messages) {
try {
await process(msg);
msg.ack();
} catch (error) {
console.error('Failed:', error);
msg.retry();
}
}
}
❌ Polling instead of Queue¶
// ❌ BAD: Polling every minute
async scheduled() {
const pending = await db.from('tasks').select('*').eq('status', 'pending');
for (const task of pending) {
await process(task);
}
}
// ✅ GOOD: Trigger-based queue
// On INSERT to tasks
await queue.send({ taskId: newTask.id });
Tổng kết¶
Queue Best Practices¶
- Idempotency - Safe to retry
- Visibility timeout - Match processing time
- Dead letter queue - Handle failures
- Batch processing - Efficiency
- Monitoring - Track health
Cron Best Practices¶
- Distributed locking - Prevent duplicates
- Timeout handling - Graceful stops
- Job logging - Track execution
- Split heavy work - Use queues
- Meaningful names - Easy debugging
Golden Rules¶
- Always handle errors explicitly
- Log everything important
- Monitor queue depth
- Test failure scenarios
Q&A¶
- Patterns nào đang dùng trong dự án?
- Có anti-patterns nào đang mắc phải?
- Monitoring hiện tại như thế nào?