Examples
Practical examples for common use cases.
Email Queue
Send emails with retry and rate limiting.
import { Queue, Worker } from 'bunqueue/client';
const emailQueue = new Queue('emails');
// Rate limit: 100 emails per secondemailQueue.setRateLimit(100);
// Add email jobawait emailQueue.add('send', { to: 'user@example.com', subject: 'Welcome!', template: 'welcome', data: { name: 'John' }}, { priority: 10, attempts: 3, backoff: 5000});
// Process emailsconst worker = new Worker('emails', async (job) => { const { to, subject, template, data } = job.data;
await job.updateProgress(10, 'Loading template'); const html = await loadTemplate(template, data);
await job.updateProgress(50, 'Sending email'); const result = await sendEmail({ to, subject, html });
await job.updateProgress(100, 'Sent'); return { messageId: result.id };});
worker.on('completed', (job, result) => { console.log(`Email sent to ${job.data.to}: ${result.messageId}`);});
worker.on('failed', (job, error) => { console.error(`Failed to send to ${job.data.to}:`, error.message);});Image Processing Pipeline
Process images with parent-child flow.
import { Queue, Worker, FlowProducer } from 'bunqueue/client';
// Create queuesconst uploadQueue = new Queue('uploads');const resizeQueue = new Queue('resize');const thumbnailQueue = new Queue('thumbnails');
// Flow producer for dependenciesconst flow = new FlowProducer();
// Add image processing flowawait flow.add({ name: 'process-image', queueName: 'uploads', data: { imageId: 'img-123', path: '/uploads/photo.jpg' }, children: [ { name: 'resize-large', queueName: 'resize', data: { size: 'large', width: 1920, height: 1080 } }, { name: 'resize-medium', queueName: 'resize', data: { size: 'medium', width: 800, height: 600 } }, { name: 'thumbnail', queueName: 'thumbnails', data: { size: 'thumb', width: 150, height: 150 } } ]});
// Upload workernew Worker('uploads', async (job) => { const { imageId, path } = job.data;
// Validate image const metadata = await getImageMetadata(path);
// Wait for all resizes to complete const childResults = await job.getChildrenValues();
// Update database with all sizes await db.images.update(imageId, { original: path, large: childResults['resize-large'], medium: childResults['resize-medium'], thumbnail: childResults['thumbnail'] });
return { processed: true };});
// Resize workersnew Worker('resize', async (job) => { const { size, width, height } = job.data; const parent = await job.getParent(); const { path } = parent.data;
const resized = await resizeImage(path, width, height); const outputPath = `/processed/${size}/${Date.now()}.jpg`; await saveImage(resized, outputPath);
return outputPath;}, { concurrency: 4 });
// Thumbnail workernew Worker('thumbnails', async (job) => { const { width, height } = job.data; const parent = await job.getParent(); const { path } = parent.data;
const thumb = await createThumbnail(path, width, height); const outputPath = `/thumbnails/${Date.now()}.jpg`; await saveImage(thumb, outputPath);
return outputPath;});Scheduled Reports
Generate reports on a schedule.
import { Queue, Worker } from 'bunqueue/client';
const reportsQueue = new Queue('reports');
// Daily report at 6 AMawait reportsQueue.add('daily-sales', { type: 'sales', period: 'daily'}, { repeat: { pattern: '0 6 * * *' }});
// Weekly report every Monday at 9 AMawait reportsQueue.add('weekly-summary', { type: 'summary', period: 'weekly'}, { repeat: { pattern: '0 9 * * 1' }});
// Every 30 minutesawait reportsQueue.add('system-health', { type: 'health'}, { repeat: { every: 1800000 } // 30 minutes in ms});
// Report workernew Worker('reports', async (job) => { const { type, period } = job.data;
await job.log(`Generating ${type} report`);
let data; switch (type) { case 'sales': data = await generateSalesReport(period); break; case 'summary': data = await generateSummaryReport(); break; case 'health': data = await checkSystemHealth(); break; }
// Send via email or save await sendReport(type, data);
return { generated: new Date().toISOString() };});Webhook Delivery
Reliable webhook delivery with retries.
import { Queue, Worker } from 'bunqueue/client';
const webhookQueue = new Queue('webhooks');
// Configure stall detectionwebhookQueue.setStallConfig({ enabled: true, stallInterval: 30000, maxStalls: 2});
// Add webhook jobasync function deliverWebhook(url: string, event: string, payload: any) { await webhookQueue.add('deliver', { url, event, payload, timestamp: Date.now() }, { attempts: 5, backoff: 10000, // 10s base backoff removeOnComplete: true });}
// Webhook workernew Worker('webhooks', async (job) => { const { url, event, payload, timestamp } = job.data;
const response = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-Webhook-Event': event, 'X-Webhook-Timestamp': String(timestamp), 'X-Webhook-Signature': sign(payload) }, body: JSON.stringify(payload), signal: AbortSignal.timeout(30000) });
if (!response.ok) { throw new Error(`HTTP ${response.status}: ${response.statusText}`); }
return { status: response.status, delivered: Date.now() };}, { concurrency: 10 });
// Track failed webhookswebhookQueue.on('failed', async (job, error) => { if (job.attemptsMade >= job.opts.attempts) { // Max retries reached, notify admin await notifyAdmin({ type: 'webhook_failed', url: job.data.url, error: error.message }); }});Data Pipeline
ETL pipeline with stages.
import { Queue, Worker } from 'bunqueue/client';
// Stage queuesconst extractQueue = new Queue('extract');const transformQueue = new Queue('transform');const loadQueue = new Queue('load');
// Extract worker - fetch data from sourcenew Worker('extract', async (job) => { const { source, query, batchId } = job.data;
await job.updateProgress(0, 'Connecting to source'); const connection = await connectToSource(source);
await job.updateProgress(25, 'Executing query'); const records = await connection.query(query);
await job.updateProgress(50, 'Fetched records');
// Send to transform stage await transformQueue.add('transform', { batchId, records, source });
await job.updateProgress(100, 'Sent to transform'); return { recordCount: records.length };}, { concurrency: 2 });
// Transform worker - clean and enrich datanew Worker('transform', async (job) => { const { batchId, records, source } = job.data;
const transformed = []; for (let i = 0; i < records.length; i++) { const record = records[i];
// Clean data const cleaned = cleanRecord(record);
// Enrich data const enriched = await enrichRecord(cleaned);
transformed.push(enriched);
// Update progress const progress = Math.round((i / records.length) * 100); await job.updateProgress(progress); }
// Send to load stage await loadQueue.add('load', { batchId, records: transformed, source });
return { transformedCount: transformed.length };}, { concurrency: 4 });
// Load worker - write to destinationnew Worker('load', async (job) => { const { batchId, records, source } = job.data;
await job.updateProgress(0, 'Connecting to destination'); const dest = await connectToDestination();
// Batch insert const batchSize = 1000; for (let i = 0; i < records.length; i += batchSize) { const batch = records.slice(i, i + batchSize); await dest.insertMany(batch);
const progress = Math.round((i / records.length) * 100); await job.updateProgress(progress, `Loaded ${i} records`); }
await job.log(`Completed batch ${batchId} from ${source}`);
return { loadedCount: records.length, batchId };});Distributed Task Processing
Multi-worker task distribution.
// server.ts - Start bunqueue serverimport { createServer } from 'bunqueue/server';
const server = createServer({ tcpPort: 6789, httpPort: 6790, dataPath: './data/tasks.db'});
server.start();
// producer.ts - Add tasksimport { Queue } from 'bunqueue/client';
const taskQueue = new Queue('tasks', { connection: { host: 'localhost', port: 6789 }});
// Add 10,000 tasksconst tasks = Array.from({ length: 10000 }, (_, i) => ({ name: 'process', data: { taskId: i, payload: `data-${i}` }}));
await taskQueue.addBulk(tasks);console.log('Added 10,000 tasks');
// worker-1.ts, worker-2.ts, etc. - Process tasksimport { Worker } from 'bunqueue/client';
const worker = new Worker('tasks', async (job) => { const { taskId, payload } = job.data;
// Simulate work await Bun.sleep(100);
return { processed: taskId };}, { connection: { host: 'localhost', port: 6789 }, concurrency: 50});
worker.on('completed', () => { console.log(`Worker processed job`);});Graceful Shutdown
Handle shutdown properly.
import { Queue, Worker } from 'bunqueue/client';
const queue = new Queue('tasks');const worker = new Worker('tasks', async (job) => { // Long-running task for (let i = 0; i < 100; i++) { await Bun.sleep(100); await job.updateProgress(i);
// Check if we should stop if (worker.isPaused()) { throw new Error('Worker shutting down'); } } return { done: true };});
// Handle shutdown signalsasync function shutdown(signal: string) { console.log(`Received ${signal}, shutting down...`);
// Stop accepting new jobs await worker.pause();
// Wait for current jobs to complete (max 30s) const timeout = setTimeout(() => { console.log('Timeout waiting for jobs, forcing exit'); process.exit(1); }, 30000);
// Wait for active jobs await worker.close(); clearTimeout(timeout);
// Close queue connection await queue.close();
console.log('Shutdown complete'); process.exit(0);}
process.on('SIGTERM', () => shutdown('SIGTERM'));process.on('SIGINT', () => shutdown('SIGINT'));Job with Timeout
Set timeouts for long-running jobs.
import { Queue, Worker } from 'bunqueue/client';
const queue = new Queue('processing');
// Add job with 60-second timeoutawait queue.add('long-task', { data: 'large-dataset'}, { timeout: 60000, attempts: 2});
// Worker with timeout handlingnew Worker('processing', async (job) => { const controller = new AbortController(); const timeoutId = setTimeout(() => { controller.abort(); }, job.opts.timeout || 60000);
try { await processData(job.data, { signal: controller.signal }); return { success: true }; } finally { clearTimeout(timeoutId); }});Unique Jobs
Prevent duplicate jobs.
import { Queue } from 'bunqueue/client';
const queue = new Queue('notifications');
// Add unique job - won't duplicateawait queue.add('notify', { userId: 'user-123', type: 'welcome'}, { jobId: 'welcome-user-123' // Unique identifier});
// This will be ignored (same jobId)await queue.add('notify', { userId: 'user-123', type: 'welcome'}, { jobId: 'welcome-user-123'});
// Or use unique keyawait queue.add('daily-digest', { userId: 'user-123'}, { uniqueKey: `digest-user-123-${new Date().toDateString()}`});Queue Groups
Group related queues together.
import { Queue, Worker, QueueGroup } from 'bunqueue/client';
// Create group for email-related queuesconst emailGroup = new QueueGroup('email');
// Add queues to groupconst welcomeQueue = emailGroup.createQueue('welcome');const notificationQueue = emailGroup.createQueue('notifications');const digestQueue = emailGroup.createQueue('digest');
// Pause all email queues at onceawait emailGroup.pauseAll();
// Resume allawait emailGroup.resumeAll();
// Get stats for all queues in groupconst stats = await emailGroup.getStats();console.log('Total pending:', stats.totalPending);Monitoring with Events
Comprehensive event monitoring.
import { Queue, Worker, QueueEvents } from 'bunqueue/client';
const queue = new Queue('tasks');const events = new QueueEvents('tasks');
// Job lifecycle eventsevents.on('added', ({ jobId, name }) => { console.log(`Job ${jobId} (${name}) added to queue`);});
events.on('active', ({ jobId }) => { console.log(`Job ${jobId} started processing`);});
events.on('progress', ({ jobId, progress }) => { console.log(`Job ${jobId}: ${progress}%`);});
events.on('completed', ({ jobId, result }) => { console.log(`Job ${jobId} completed:`, result);});
events.on('failed', ({ jobId, error }) => { console.error(`Job ${jobId} failed:`, error);});
events.on('stalled', ({ jobId }) => { console.warn(`Job ${jobId} stalled`);});
// Worker with eventsconst worker = new Worker('tasks', async (job) => { await job.updateProgress(50); return { done: true };});
worker.on('error', (error) => { console.error('Worker error:', error);});