Cloudflare Queues Concepts
Queues là gì?
Message Queue Pattern
┌─────────────────────────────────────────────────────────────┐
│ CLOUDFLARE QUEUES │
├─────────────────────────────────────────────────────────────┤
│ │
│ Producer Queue Consumer │
│ (Worker A) (Worker B) │
│ │ │ │
│ │ ┌─────────────────────────────┐ │ │
│ └───▶│ msg1 msg2 msg3 msg4 │──────────▶│ │
│ └─────────────────────────────┘ │ │
│ │
│ Features: │
│ - Guaranteed delivery │
│ - At-least-once semantics │
│ - Automatic retries │
│ - Batch processing │
│ - Dead letter queue │
│ │
└─────────────────────────────────────────────────────────────┘
When to Use Queues
Use Cases
✅ GOOD for:
- Email/notification sending
- Image/video processing
- Webhook delivery
- Log aggregation
- Order processing
- Any async task
❌ NOT for:
- Real-time communication (use WebSocket)
- Request-response patterns (use direct calls)
- Time-sensitive data (has latency)
Create Queue
Via Wrangler
# Create queue
wrangler queues create my-queue
# List queues
wrangler queues list
# Delete queue
wrangler queues delete my-queue
Configuration
# wrangler.toml
# Producer binding (send messages)
[[queues.producers]]
queue = "my-queue"
binding = "MY_QUEUE"
# Consumer binding (receive messages)
[[queues.consumers]]
queue = "my-queue"
max_batch_size = 10 # 1-100 messages per batch
max_batch_timeout = 30 # Max seconds to wait for batch
max_retries = 3 # Retry count before dead letter
dead_letter_queue = "my-dlq" # Optional DLQ
Producer (Sending Messages)
Basic Send
interface Env {
MY_QUEUE: Queue;
}
export default {
async fetch(request: Request, env: Env): Promise<Response> {
// Send single message
await env.MY_QUEUE.send({
type: 'email',
to: 'user@example.com',
subject: 'Welcome!',
});
return new Response('Message queued');
},
};
Batch Send
// Send multiple messages
await env.MY_QUEUE.sendBatch([
{ body: { type: 'email', to: 'user1@example.com' } },
{ body: { type: 'email', to: 'user2@example.com' } },
{ body: { type: 'email', to: 'user3@example.com' } },
]);
With Options
// Send with options
await env.MY_QUEUE.send(
{ type: 'reminder', userId: '123' },
{
// Delay delivery (up to 7 days)
delaySeconds: 3600, // 1 hour
// Content type
contentType: 'json',
}
);
Consumer (Processing Messages)
Basic Consumer
interface Env {
MY_QUEUE: Queue;
// Other bindings...
}
interface EmailMessage {
type: 'email';
to: string;
subject: string;
body?: string;
}
export default {
// HTTP handler
async fetch(request: Request, env: Env): Promise<Response> {
// Producer code here
return new Response('OK');
},
// Queue consumer
async queue(
batch: MessageBatch<EmailMessage>,
env: Env,
ctx: ExecutionContext
): Promise<void> {
for (const message of batch.messages) {
try {
await processEmail(message.body);
message.ack(); // Success
} catch (error) {
console.error('Failed:', error);
message.retry(); // Retry later
}
}
},
};
async function processEmail(data: EmailMessage) {
// Send email via SendGrid, etc.
}
Message Methods
async queue(batch: MessageBatch<any>, env: Env): Promise<void> {
for (const message of batch.messages) {
// Message properties
console.log('ID:', message.id);
console.log('Body:', message.body);
console.log('Timestamp:', message.timestamp);
console.log('Attempts:', message.attempts);
try {
await process(message.body);
// Acknowledge success
message.ack();
} catch (error) {
if (isRecoverable(error)) {
// Retry later (with exponential backoff)
message.retry({
delaySeconds: Math.pow(2, message.attempts) * 10,
});
} else {
// Don't retry, send to DLQ if configured
message.ack(); // Or let it go to DLQ after max retries
}
}
}
}
Batch Processing
Process Efficiently
async queue(batch: MessageBatch<any>, env: Env): Promise<void> {
console.log(`Processing batch of ${batch.messages.length} messages`);
// Group by type
const byType = batch.messages.reduce((acc, msg) => {
const type = msg.body.type;
if (!acc[type]) acc[type] = [];
acc[type].push(msg);
return acc;
}, {} as Record<string, Message[]>);
// Process each type
for (const [type, messages] of Object.entries(byType)) {
switch (type) {
case 'email':
await processBatchEmails(messages);
break;
case 'webhook':
await processBatchWebhooks(messages);
break;
}
}
// Ack all at once
batch.ackAll();
}
async function processBatchEmails(messages: Message[]) {
// Batch email sending
const emails = messages.map(m => m.body);
await sendBulkEmail(emails);
}
Dead Letter Queue (DLQ)
# wrangler.toml
# Main queue
[[queues.consumers]]
queue = "email-queue"
max_retries = 3
dead_letter_queue = "email-dlq"
# DLQ consumer (separate worker or same)
[[queues.consumers]]
queue = "email-dlq"
Process DLQ
export default {
// Main queue consumer
async queue(batch: MessageBatch<any>, env: Env): Promise<void> {
// Process main queue...
},
// DLQ consumer (or separate worker)
async queueDLQ(batch: MessageBatch<any>, env: Env): Promise<void> {
for (const message of batch.messages) {
console.error('DLQ message:', {
body: message.body,
attempts: message.attempts,
});
// Log to monitoring
await logToSentry(message);
// Notify team
await notifySlack(`Failed message: ${message.id}`);
message.ack();
}
},
};
Tổng kết
Queue Configuration
[[queues.producers]]
queue = "my-queue"
binding = "MY_QUEUE"
[[queues.consumers]]
queue = "my-queue"
max_batch_size = 10
max_batch_timeout = 30
max_retries = 3
dead_letter_queue = "my-dlq"
Key Concepts
| Concept |
Description |
| Producer |
Sends messages to queue |
| Consumer |
Processes messages |
| Batch |
Group of messages |
| ack() |
Mark as processed |
| retry() |
Retry later |
| DLQ |
Failed messages go here |
Q&A
- Có use case nào cần queue?
- Batch size phù hợp?
- Questions về retry logic?