Bỏ qua

Cloudflare Queues Concepts

Queues là gì?

Message Queue Pattern

┌─────────────────────────────────────────────────────────────┐
│                  CLOUDFLARE QUEUES                           │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  Producer                    Queue                Consumer   │
│  (Worker A)                                      (Worker B)  │
│      │                                               │       │
│      │    ┌─────────────────────────────┐           │       │
│      └───▶│  msg1  msg2  msg3  msg4     │──────────▶│       │
│           └─────────────────────────────┘           │       │
│                                                              │
│  Features:                                                   │
│  - Guaranteed delivery                                      │
│  - At-least-once semantics                                 │
│  - Automatic retries                                        │
│  - Batch processing                                         │
│  - Dead letter queue                                        │
│                                                              │
└─────────────────────────────────────────────────────────────┘

When to Use Queues

Use Cases

✅ GOOD for:
   - Email/notification sending
   - Image/video processing
   - Webhook delivery
   - Log aggregation
   - Order processing
   - Any async task

❌ NOT for:
   - Real-time communication (use WebSocket)
   - Request-response patterns (use direct calls)
   - Time-sensitive data (has latency)

Create Queue

Via Wrangler

# Create queue
wrangler queues create my-queue

# List queues
wrangler queues list

# Delete queue
wrangler queues delete my-queue

Configuration

# wrangler.toml

# Producer binding (send messages)
[[queues.producers]]
queue = "my-queue"
binding = "MY_QUEUE"

# Consumer binding (receive messages)
[[queues.consumers]]
queue = "my-queue"
max_batch_size = 10        # 1-100 messages per batch
max_batch_timeout = 30     # Max seconds to wait for batch
max_retries = 3            # Retry count before dead letter
dead_letter_queue = "my-dlq"  # Optional DLQ

Producer (Sending Messages)

Basic Send

interface Env {
  MY_QUEUE: Queue;
}

export default {
  async fetch(request: Request, env: Env): Promise<Response> {
    // Send single message
    await env.MY_QUEUE.send({
      type: 'email',
      to: 'user@example.com',
      subject: 'Welcome!',
    });

    return new Response('Message queued');
  },
};

Batch Send

// Send multiple messages
await env.MY_QUEUE.sendBatch([
  { body: { type: 'email', to: 'user1@example.com' } },
  { body: { type: 'email', to: 'user2@example.com' } },
  { body: { type: 'email', to: 'user3@example.com' } },
]);

With Options

// Send with options
await env.MY_QUEUE.send(
  { type: 'reminder', userId: '123' },
  {
    // Delay delivery (up to 7 days)
    delaySeconds: 3600,  // 1 hour

    // Content type
    contentType: 'json',
  }
);

Consumer (Processing Messages)

Basic Consumer

interface Env {
  MY_QUEUE: Queue;
  // Other bindings...
}

interface EmailMessage {
  type: 'email';
  to: string;
  subject: string;
  body?: string;
}

export default {
  // HTTP handler
  async fetch(request: Request, env: Env): Promise<Response> {
    // Producer code here
    return new Response('OK');
  },

  // Queue consumer
  async queue(
    batch: MessageBatch<EmailMessage>,
    env: Env,
    ctx: ExecutionContext
  ): Promise<void> {
    for (const message of batch.messages) {
      try {
        await processEmail(message.body);
        message.ack();  // Success
      } catch (error) {
        console.error('Failed:', error);
        message.retry();  // Retry later
      }
    }
  },
};

async function processEmail(data: EmailMessage) {
  // Send email via SendGrid, etc.
}

Message Methods

async queue(batch: MessageBatch<any>, env: Env): Promise<void> {
  for (const message of batch.messages) {
    // Message properties
    console.log('ID:', message.id);
    console.log('Body:', message.body);
    console.log('Timestamp:', message.timestamp);
    console.log('Attempts:', message.attempts);

    try {
      await process(message.body);

      // Acknowledge success
      message.ack();

    } catch (error) {
      if (isRecoverable(error)) {
        // Retry later (with exponential backoff)
        message.retry({
          delaySeconds: Math.pow(2, message.attempts) * 10,
        });
      } else {
        // Don't retry, send to DLQ if configured
        message.ack();  // Or let it go to DLQ after max retries
      }
    }
  }
}

Batch Processing

Process Efficiently

async queue(batch: MessageBatch<any>, env: Env): Promise<void> {
  console.log(`Processing batch of ${batch.messages.length} messages`);

  // Group by type
  const byType = batch.messages.reduce((acc, msg) => {
    const type = msg.body.type;
    if (!acc[type]) acc[type] = [];
    acc[type].push(msg);
    return acc;
  }, {} as Record<string, Message[]>);

  // Process each type
  for (const [type, messages] of Object.entries(byType)) {
    switch (type) {
      case 'email':
        await processBatchEmails(messages);
        break;
      case 'webhook':
        await processBatchWebhooks(messages);
        break;
    }
  }

  // Ack all at once
  batch.ackAll();
}

async function processBatchEmails(messages: Message[]) {
  // Batch email sending
  const emails = messages.map(m => m.body);
  await sendBulkEmail(emails);
}

Dead Letter Queue (DLQ)

Configure DLQ

# wrangler.toml

# Main queue
[[queues.consumers]]
queue = "email-queue"
max_retries = 3
dead_letter_queue = "email-dlq"

# DLQ consumer (separate worker or same)
[[queues.consumers]]
queue = "email-dlq"

Process DLQ

export default {
  // Main queue consumer
  async queue(batch: MessageBatch<any>, env: Env): Promise<void> {
    // Process main queue...
  },

  // DLQ consumer (or separate worker)
  async queueDLQ(batch: MessageBatch<any>, env: Env): Promise<void> {
    for (const message of batch.messages) {
      console.error('DLQ message:', {
        body: message.body,
        attempts: message.attempts,
      });

      // Log to monitoring
      await logToSentry(message);

      // Notify team
      await notifySlack(`Failed message: ${message.id}`);

      message.ack();
    }
  },
};

Tổng kết

Queue Configuration

[[queues.producers]]
queue = "my-queue"
binding = "MY_QUEUE"

[[queues.consumers]]
queue = "my-queue"
max_batch_size = 10
max_batch_timeout = 30
max_retries = 3
dead_letter_queue = "my-dlq"

Key Concepts

Concept Description
Producer Sends messages to queue
Consumer Processes messages
Batch Group of messages
ack() Mark as processed
retry() Retry later
DLQ Failed messages go here

Q&A

  1. Có use case nào cần queue?
  2. Batch size phù hợp?
  3. Questions về retry logic?