Skip to content

Examples

Practical examples for common use cases.

Email Queue

Send emails with retry and rate limiting.

import { Queue, Worker } from 'bunqueue/client';
const emailQueue = new Queue('emails');
// Rate limit: 100 emails per second
emailQueue.setRateLimit(100);
// Add email job
await emailQueue.add('send', {
to: 'user@example.com',
subject: 'Welcome!',
template: 'welcome',
data: { name: 'John' }
}, {
priority: 10,
attempts: 3,
backoff: 5000
});
// Process emails
const worker = new Worker('emails', async (job) => {
const { to, subject, template, data } = job.data;
await job.updateProgress(10, 'Loading template');
const html = await loadTemplate(template, data);
await job.updateProgress(50, 'Sending email');
const result = await sendEmail({ to, subject, html });
await job.updateProgress(100, 'Sent');
return { messageId: result.id };
});
worker.on('completed', (job, result) => {
console.log(`Email sent to ${job.data.to}: ${result.messageId}`);
});
worker.on('failed', (job, error) => {
console.error(`Failed to send to ${job.data.to}:`, error.message);
});

Image Processing Pipeline

Process images with parent-child flow.

import { Queue, Worker, FlowProducer } from 'bunqueue/client';
// Create queues
const uploadQueue = new Queue('uploads');
const resizeQueue = new Queue('resize');
const thumbnailQueue = new Queue('thumbnails');
// Flow producer for dependencies
const flow = new FlowProducer();
// Add image processing flow
await flow.add({
name: 'process-image',
queueName: 'uploads',
data: { imageId: 'img-123', path: '/uploads/photo.jpg' },
children: [
{
name: 'resize-large',
queueName: 'resize',
data: { size: 'large', width: 1920, height: 1080 }
},
{
name: 'resize-medium',
queueName: 'resize',
data: { size: 'medium', width: 800, height: 600 }
},
{
name: 'thumbnail',
queueName: 'thumbnails',
data: { size: 'thumb', width: 150, height: 150 }
}
]
});
// Upload worker
new Worker('uploads', async (job) => {
const { imageId, path } = job.data;
// Validate image
const metadata = await getImageMetadata(path);
// Wait for all resizes to complete
const childResults = await job.getChildrenValues();
// Update database with all sizes
await db.images.update(imageId, {
original: path,
large: childResults['resize-large'],
medium: childResults['resize-medium'],
thumbnail: childResults['thumbnail']
});
return { processed: true };
});
// Resize workers
new Worker('resize', async (job) => {
const { size, width, height } = job.data;
const parent = await job.getParent();
const { path } = parent.data;
const resized = await resizeImage(path, width, height);
const outputPath = `/processed/${size}/${Date.now()}.jpg`;
await saveImage(resized, outputPath);
return outputPath;
}, { concurrency: 4 });
// Thumbnail worker
new Worker('thumbnails', async (job) => {
const { width, height } = job.data;
const parent = await job.getParent();
const { path } = parent.data;
const thumb = await createThumbnail(path, width, height);
const outputPath = `/thumbnails/${Date.now()}.jpg`;
await saveImage(thumb, outputPath);
return outputPath;
});

Scheduled Reports

Generate reports on a schedule.

import { Queue, Worker } from 'bunqueue/client';
const reportsQueue = new Queue('reports');
// Daily report at 6 AM
await reportsQueue.add('daily-sales', {
type: 'sales',
period: 'daily'
}, {
repeat: { pattern: '0 6 * * *' }
});
// Weekly report every Monday at 9 AM
await reportsQueue.add('weekly-summary', {
type: 'summary',
period: 'weekly'
}, {
repeat: { pattern: '0 9 * * 1' }
});
// Every 30 minutes
await reportsQueue.add('system-health', {
type: 'health'
}, {
repeat: { every: 1800000 } // 30 minutes in ms
});
// Report worker
new Worker('reports', async (job) => {
const { type, period } = job.data;
await job.log(`Generating ${type} report`);
let data;
switch (type) {
case 'sales':
data = await generateSalesReport(period);
break;
case 'summary':
data = await generateSummaryReport();
break;
case 'health':
data = await checkSystemHealth();
break;
}
// Send via email or save
await sendReport(type, data);
return { generated: new Date().toISOString() };
});

Webhook Delivery

Reliable webhook delivery with retries.

import { Queue, Worker } from 'bunqueue/client';
const webhookQueue = new Queue('webhooks');
// Configure stall detection
webhookQueue.setStallConfig({
enabled: true,
stallInterval: 30000,
maxStalls: 2
});
// Add webhook job
async function deliverWebhook(url: string, event: string, payload: any) {
await webhookQueue.add('deliver', {
url,
event,
payload,
timestamp: Date.now()
}, {
attempts: 5,
backoff: 10000, // 10s base backoff
removeOnComplete: true
});
}
// Webhook worker
new Worker('webhooks', async (job) => {
const { url, event, payload, timestamp } = job.data;
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Event': event,
'X-Webhook-Timestamp': String(timestamp),
'X-Webhook-Signature': sign(payload)
},
body: JSON.stringify(payload),
signal: AbortSignal.timeout(30000)
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
return {
status: response.status,
delivered: Date.now()
};
}, { concurrency: 10 });
// Track failed webhooks
webhookQueue.on('failed', async (job, error) => {
if (job.attemptsMade >= job.opts.attempts) {
// Max retries reached, notify admin
await notifyAdmin({
type: 'webhook_failed',
url: job.data.url,
error: error.message
});
}
});

Data Pipeline

ETL pipeline with stages.

import { Queue, Worker } from 'bunqueue/client';
// Stage queues
const extractQueue = new Queue('extract');
const transformQueue = new Queue('transform');
const loadQueue = new Queue('load');
// Extract worker - fetch data from source
new Worker('extract', async (job) => {
const { source, query, batchId } = job.data;
await job.updateProgress(0, 'Connecting to source');
const connection = await connectToSource(source);
await job.updateProgress(25, 'Executing query');
const records = await connection.query(query);
await job.updateProgress(50, 'Fetched records');
// Send to transform stage
await transformQueue.add('transform', {
batchId,
records,
source
});
await job.updateProgress(100, 'Sent to transform');
return { recordCount: records.length };
}, { concurrency: 2 });
// Transform worker - clean and enrich data
new Worker('transform', async (job) => {
const { batchId, records, source } = job.data;
const transformed = [];
for (let i = 0; i < records.length; i++) {
const record = records[i];
// Clean data
const cleaned = cleanRecord(record);
// Enrich data
const enriched = await enrichRecord(cleaned);
transformed.push(enriched);
// Update progress
const progress = Math.round((i / records.length) * 100);
await job.updateProgress(progress);
}
// Send to load stage
await loadQueue.add('load', {
batchId,
records: transformed,
source
});
return { transformedCount: transformed.length };
}, { concurrency: 4 });
// Load worker - write to destination
new Worker('load', async (job) => {
const { batchId, records, source } = job.data;
await job.updateProgress(0, 'Connecting to destination');
const dest = await connectToDestination();
// Batch insert
const batchSize = 1000;
for (let i = 0; i < records.length; i += batchSize) {
const batch = records.slice(i, i + batchSize);
await dest.insertMany(batch);
const progress = Math.round((i / records.length) * 100);
await job.updateProgress(progress, `Loaded ${i} records`);
}
await job.log(`Completed batch ${batchId} from ${source}`);
return {
loadedCount: records.length,
batchId
};
});

Distributed Task Processing

Multi-worker task distribution.

// server.ts - Start bunqueue server
import { createServer } from 'bunqueue/server';
const server = createServer({
tcpPort: 6789,
httpPort: 6790,
dataPath: './data/tasks.db'
});
server.start();
// producer.ts - Add tasks
import { Queue } from 'bunqueue/client';
const taskQueue = new Queue('tasks', {
connection: { host: 'localhost', port: 6789 }
});
// Add 10,000 tasks
const tasks = Array.from({ length: 10000 }, (_, i) => ({
name: 'process',
data: { taskId: i, payload: `data-${i}` }
}));
await taskQueue.addBulk(tasks);
console.log('Added 10,000 tasks');
// worker-1.ts, worker-2.ts, etc. - Process tasks
import { Worker } from 'bunqueue/client';
const worker = new Worker('tasks', async (job) => {
const { taskId, payload } = job.data;
// Simulate work
await Bun.sleep(100);
return { processed: taskId };
}, {
connection: { host: 'localhost', port: 6789 },
concurrency: 50
});
worker.on('completed', () => {
console.log(`Worker processed job`);
});

Graceful Shutdown

Handle shutdown properly.

import { Queue, Worker } from 'bunqueue/client';
const queue = new Queue('tasks');
const worker = new Worker('tasks', async (job) => {
// Long-running task
for (let i = 0; i < 100; i++) {
await Bun.sleep(100);
await job.updateProgress(i);
// Check if we should stop
if (worker.isPaused()) {
throw new Error('Worker shutting down');
}
}
return { done: true };
});
// Handle shutdown signals
async function shutdown(signal: string) {
console.log(`Received ${signal}, shutting down...`);
// Stop accepting new jobs
await worker.pause();
// Wait for current jobs to complete (max 30s)
const timeout = setTimeout(() => {
console.log('Timeout waiting for jobs, forcing exit');
process.exit(1);
}, 30000);
// Wait for active jobs
await worker.close();
clearTimeout(timeout);
// Close queue connection
await queue.close();
console.log('Shutdown complete');
process.exit(0);
}
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));

Job with Timeout

Set timeouts for long-running jobs.

import { Queue, Worker } from 'bunqueue/client';
const queue = new Queue('processing');
// Add job with 60-second timeout
await queue.add('long-task', {
data: 'large-dataset'
}, {
timeout: 60000,
attempts: 2
});
// Worker with timeout handling
new Worker('processing', async (job) => {
const controller = new AbortController();
const timeoutId = setTimeout(() => {
controller.abort();
}, job.opts.timeout || 60000);
try {
await processData(job.data, { signal: controller.signal });
return { success: true };
} finally {
clearTimeout(timeoutId);
}
});

Unique Jobs

Prevent duplicate jobs.

import { Queue } from 'bunqueue/client';
const queue = new Queue('notifications');
// Add unique job - won't duplicate
await queue.add('notify', {
userId: 'user-123',
type: 'welcome'
}, {
jobId: 'welcome-user-123' // Unique identifier
});
// This will be ignored (same jobId)
await queue.add('notify', {
userId: 'user-123',
type: 'welcome'
}, {
jobId: 'welcome-user-123'
});
// Or use unique key
await queue.add('daily-digest', {
userId: 'user-123'
}, {
uniqueKey: `digest-user-123-${new Date().toDateString()}`
});

Queue Groups

Group related queues together.

import { Queue, Worker, QueueGroup } from 'bunqueue/client';
// Create group for email-related queues
const emailGroup = new QueueGroup('email');
// Add queues to group
const welcomeQueue = emailGroup.createQueue('welcome');
const notificationQueue = emailGroup.createQueue('notifications');
const digestQueue = emailGroup.createQueue('digest');
// Pause all email queues at once
await emailGroup.pauseAll();
// Resume all
await emailGroup.resumeAll();
// Get stats for all queues in group
const stats = await emailGroup.getStats();
console.log('Total pending:', stats.totalPending);

Monitoring with Events

Comprehensive event monitoring.

import { Queue, Worker, QueueEvents } from 'bunqueue/client';
const queue = new Queue('tasks');
const events = new QueueEvents('tasks');
// Job lifecycle events
events.on('added', ({ jobId, name }) => {
console.log(`Job ${jobId} (${name}) added to queue`);
});
events.on('active', ({ jobId }) => {
console.log(`Job ${jobId} started processing`);
});
events.on('progress', ({ jobId, progress }) => {
console.log(`Job ${jobId}: ${progress}%`);
});
events.on('completed', ({ jobId, result }) => {
console.log(`Job ${jobId} completed:`, result);
});
events.on('failed', ({ jobId, error }) => {
console.error(`Job ${jobId} failed:`, error);
});
events.on('stalled', ({ jobId }) => {
console.warn(`Job ${jobId} stalled`);
});
// Worker with events
const worker = new Worker('tasks', async (job) => {
await job.updateProgress(50);
return { done: true };
});
worker.on('error', (error) => {
console.error('Worker error:', error);
});