Skip to content

Cloudflare Queues allow you to process tasks asynchronously in the background. Cloudwerk provides seamless integration for sending and consuming queue messages.

  1. Create a queue via Wrangler:

    Terminal window
    wrangler queues create my-queue
  2. Add the queue binding to wrangler.toml:

    [[queues.producers]]
    binding = "MY_QUEUE"
    queue = "my-queue"
    [[queues.consumers]]
    queue = "my-queue"
    max_batch_size = 10
    max_batch_timeout = 30
  3. Configure Cloudwerk:

    // cloudwerk.config.ts
    import { defineConfig } from '@cloudwerk/core';
    export default defineConfig({
    queues: {
    MY_QUEUE: {
    handler: './workers/queue-handler.ts',
    },
    },
    });
// app/api/orders/route.ts
import { json } from '@cloudwerk/core';
export async function POST(request: Request, { context }: CloudwerkHandlerContext) {
const order = await request.json();
// Save order to database
const savedOrder = await context.db
.insertInto('orders')
.values(order)
.returning(['id'])
.executeTakeFirst();
// Queue background processing
await context.queues.MY_QUEUE.send({
type: 'process_order',
orderId: savedOrder.id,
});
return json({ orderId: savedOrder.id }, { status: 201 });
}
// Send multiple messages
await context.queues.MY_QUEUE.sendBatch([
{ body: { type: 'email', to: 'user1@example.com', template: 'welcome' } },
{ body: { type: 'email', to: 'user2@example.com', template: 'welcome' } },
{ body: { type: 'email', to: 'user3@example.com', template: 'welcome' } },
]);
// Delay message by 60 seconds
await context.queues.MY_QUEUE.send(
{ type: 'reminder', userId: user.id },
{ delaySeconds: 60 }
);

Create a queue handler to process messages:

// workers/queue-handler.ts
import type { QueueHandler, Message } from '@cloudwerk/core';
interface QueueMessage {
type: string;
[key: string]: unknown;
}
export default {
async queue(batch: Message<QueueMessage>[], env: Env, ctx: ExecutionContext) {
for (const message of batch) {
try {
await processMessage(message.body, env);
message.ack();
} catch (error) {
console.error('Failed to process message:', error);
message.retry();
}
}
},
} satisfies QueueHandler;
async function processMessage(data: QueueMessage, env: Env) {
switch (data.type) {
case 'process_order':
await processOrder(data.orderId as string, env);
break;
case 'send_email':
await sendEmail(data as EmailMessage, env);
break;
default:
console.warn('Unknown message type:', data.type);
}
}
export default {
async queue(batch: Message[], env: Env) {
for (const message of batch) {
try {
await processMessage(message.body);
// Mark message as processed
message.ack();
} catch (error) {
if (isRetryable(error)) {
// Retry the message later
message.retry({ delaySeconds: 30 });
} else {
// Don't retry, acknowledge to remove from queue
message.ack();
// Log to dead letter handling
await logFailedMessage(message, error);
}
}
}
},
};
// workers/email-queue.ts
import type { Message } from '@cloudwerk/core';
interface EmailMessage {
to: string;
template: string;
data: Record<string, unknown>;
}
export default {
async queue(batch: Message<EmailMessage>[], env: Env) {
for (const message of batch) {
const { to, template, data } = message.body;
try {
await sendEmail(to, template, data, env);
message.ack();
} catch (error) {
console.error(`Failed to send email to ${to}:`, error);
message.retry({ delaySeconds: 60 });
}
}
},
};
async function sendEmail(to: string, template: string, data: Record<string, unknown>, env: Env) {
const response = await fetch('https://api.sendgrid.com/v3/mail/send', {
method: 'POST',
headers: {
'Authorization': `Bearer ${env.SENDGRID_API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
personalizations: [{ to: [{ email: to }] }],
from: { email: 'noreply@example.com' },
template_id: template,
dynamic_template_data: data,
}),
});
if (!response.ok) {
throw new Error(`SendGrid error: ${response.status}`);
}
}
// workers/image-queue.ts
interface ImageJob {
imageKey: string;
sizes: { width: number; height: number; suffix: string }[];
}
export default {
async queue(batch: Message<ImageJob>[], env: Env) {
for (const message of batch) {
const { imageKey, sizes } = message.body;
try {
// Get original image from R2
const original = await env.R2.get(imageKey);
if (!original) {
message.ack();
continue;
}
// Process each size
for (const size of sizes) {
const resized = await resizeImage(original, size.width, size.height);
const newKey = imageKey.replace(/(\.[^.]+)$/, `_${size.suffix}$1`);
await env.R2.put(newKey, resized);
}
message.ack();
} catch (error) {
console.error('Image processing failed:', error);
message.retry();
}
}
},
};
// workers/webhook-queue.ts
interface WebhookJob {
url: string;
event: string;
payload: unknown;
retries: number;
}
export default {
async queue(batch: Message<WebhookJob>[], env: Env) {
for (const message of batch) {
const { url, event, payload, retries = 0 } = message.body;
try {
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Event': event,
'X-Webhook-Signature': await signPayload(payload, env.WEBHOOK_SECRET),
},
body: JSON.stringify(payload),
});
if (response.ok) {
message.ack();
} else if (retries < 5) {
// Exponential backoff
const delay = Math.pow(2, retries) * 60;
message.retry({ delaySeconds: delay });
} else {
message.ack();
await logWebhookFailure(url, event, payload, env);
}
} catch (error) {
if (retries < 5) {
message.retry({ delaySeconds: 60 });
} else {
message.ack();
}
}
}
},
};
export default {
async queue(batch: Message[], env: Env) {
for (const message of batch) {
const attempts = message.attempts ?? 1;
try {
await processMessage(message.body);
message.ack();
} catch (error) {
if (attempts >= 5) {
// Max retries reached, dead letter
await env.DEAD_LETTER_QUEUE.send({
originalMessage: message.body,
error: error.message,
attempts,
});
message.ack();
} else {
// Exponential backoff: 1m, 2m, 4m, 8m, 16m
const delaySeconds = Math.pow(2, attempts - 1) * 60;
message.retry({ delaySeconds });
}
}
}
},
};
// Set up dead letter handling
// wrangler.toml
[[queues.producers]]
binding = "DEAD_LETTER"
queue = "dead-letter-queue"
[[queues.consumers]]
queue = "dead-letter-queue"
max_batch_size = 10
// workers/dead-letter-handler.ts
export default {
async queue(batch: Message[], env: Env) {
for (const message of batch) {
// Store failed message for investigation
await env.DB.prepare(`
INSERT INTO failed_jobs (payload, error, created_at)
VALUES (?, ?, datetime('now'))
`).bind(
JSON.stringify(message.body.originalMessage),
message.body.error
).run();
// Alert operations team
await sendAlert('Dead letter received', message.body);
message.ack();
}
},
};
export default {
async queue(batch: Message[], env: Env) {
for (const message of batch) {
const { orderId, action } = message.body;
// Check if already processed
const existing = await env.DB.prepare(`
SELECT id FROM processed_messages WHERE message_id = ?
`).bind(message.id).first();
if (existing) {
message.ack();
continue;
}
try {
await processOrder(orderId, action, env);
// Mark as processed
await env.DB.prepare(`
INSERT INTO processed_messages (message_id, processed_at)
VALUES (?, datetime('now'))
`).bind(message.id).run();
message.ack();
} catch (error) {
message.retry();
}
}
},
};