Queue
The Queue class is used to add and manage jobs.
Creating a Queue
import { Queue } from 'bunqueue/client';
// Basic queueconst queue = new Queue('my-queue');
// Typed queueinterface TaskData { userId: number; action: string;}const typedQueue = new Queue<TaskData>('tasks');
// With default optionsconst queue = new Queue('emails', { defaultJobOptions: { attempts: 3, backoff: 1000, removeOnComplete: true, }});Adding Jobs
Single Job
const job = await queue.add('job-name', { key: 'value' });
// With optionsconst job = await queue.add('job-name', data, { priority: 10, // Higher priority = processed first delay: 5000, // Delay in ms before processing attempts: 5, // Max retry attempts backoff: 2000, // Backoff between retries (ms) timeout: 30000, // Job timeout (ms) jobId: 'custom-id', // Custom job ID removeOnComplete: true, // Remove job data after completion removeOnFail: false, // Keep failed jobs});Bulk Add
// Batch optimized - single lock, batch INSERTconst jobs = await queue.addBulk([ { name: 'task-1', data: { id: 1 } }, { name: 'task-2', data: { id: 2 }, opts: { priority: 10 } }, { name: 'task-3', data: { id: 3 }, opts: { delay: 5000 } },]);Repeatable Jobs
// Repeat every 5 secondsawait queue.add('heartbeat', {}, { repeat: { every: 5000, }});
// Repeat with limitawait queue.add('daily-report', {}, { repeat: { every: 86400000, // 24 hours limit: 30, // Max 30 repetitions }});
// Cron pattern (use server mode for cron)await queue.add('weekly', {}, { repeat: { pattern: '0 9 * * MON', // Every Monday at 9am }});Query Operations
// Get job by IDconst job = await queue.getJob('job-id');
// Get job countsconst counts = queue.getJobCounts();// { waiting: 10, active: 2, completed: 100, failed: 3 }Queue Control
// Pause processing (workers stop pulling)queue.pause();
// Resume processingqueue.resume();
// Remove all waiting jobsqueue.drain();
// Remove all queue dataqueue.obliterate();
// Remove a specific jobqueue.remove('job-id');Stall Configuration
// Configure stall detectionqueue.setStallConfig({ enabled: true, stallInterval: 30000, // 30 seconds maxStalls: 3, // Move to DLQ after 3 stalls gracePeriod: 5000, // 5 second grace period});
// Get current configconst config = queue.getStallConfig();See Stall Detection for more details.
DLQ Operations
// Configure DLQqueue.setDlqConfig({ autoRetry: true, autoRetryInterval: 3600000, // 1 hour maxAutoRetries: 3, maxAge: 604800000, // 7 days});
// Get current DLQ configconst dlqConfig = queue.getDlqConfig();
// Get DLQ entriesconst entries = queue.getDlq();
// Filter entriesconst stalledJobs = queue.getDlq({ reason: 'stalled' });const recentFails = queue.getDlq({ newerThan: Date.now() - 3600000 });
// Get statsconst stats = queue.getDlqStats();// { total, byReason, pendingRetry, expired, oldestEntry, newestEntry }
// Retry from DLQqueue.retryDlq(); // Retry allqueue.retryDlq('job-id'); // Retry specific
// Retry by filterqueue.retryDlqByFilter({ reason: 'timeout', limit: 10 });
// Purge DLQqueue.purgeDlq();See Dead Letter Queue for more details.
Closing
// Close the queue (no-op in embedded mode)await queue.close();