Worker
The Worker class processes jobs from a queue.
Creating a Worker
import { Worker } from 'bunqueue/client';
const worker = new Worker('my-queue', async (job) => { // Process the job return { success: true };});Options
const worker = new Worker('queue', processor, { concurrency: 5, // Process 5 jobs in parallel (default: 1) autorun: true, // Start automatically (default: true) heartbeatInterval: 10000, // Heartbeat every 10s (default: 10000)});Job Object
Inside the processor, you have access to the job object:
const worker = new Worker('queue', async (job) => { job.id; // Job ID job.name; // Job name job.data; // Job data job.queueName; // Queue name job.attemptsMade; // Current attempt number job.timestamp; // When job was created job.progress; // Current progress (0-100)
// Update progress await job.updateProgress(50, 'Halfway done');
// Log messages await job.log('Processing step 1');
return result;});Events
worker.on('ready', () => { console.log('Worker is ready');});
worker.on('active', (job) => { console.log(`Started: ${job.id}`);});
worker.on('completed', (job, result) => { console.log(`Completed: ${job.id}`, result);});
worker.on('failed', (job, error) => { console.error(`Failed: ${job.id}`, error.message);});
worker.on('progress', (job, progress) => { console.log(`Progress: ${job.id} - ${progress}%`);});
worker.on('error', (error) => { console.error('Worker error:', error);});
worker.on('closed', () => { console.log('Worker closed');});Control
// Pause processingworker.pause();
// Resume processingworker.resume();
// Stop the workerawait worker.close(); // Wait for active jobsawait worker.close(true); // Force close immediatelyHeartbeats
Workers automatically send heartbeats while processing jobs. This enables stall detection - if a job doesn’t receive a heartbeat within the configured interval, it’s considered stalled.
const worker = new Worker('queue', processor, { heartbeatInterval: 5000, // Send heartbeat every 5 seconds});See Stall Detection for more details.
Error Handling
const worker = new Worker('queue', async (job) => { try { await riskyOperation(); } catch (error) { // Job will be retried if attempts remain throw error; }});
// Or handle at worker levelworker.on('failed', (job, error) => { if (job.attemptsMade >= 3) { // Final failure - alert someone alertOps(job, error); }});SandboxedWorker
For CPU-intensive tasks or jobs that might crash, use SandboxedWorker to run processors in isolated Bun Worker processes.
import { SandboxedWorker } from 'bunqueue/client';
const worker = new SandboxedWorker('cpu-intensive', { processor: './processor.ts', // Path to processor file concurrency: 4, // 4 parallel worker processes timeout: 60000, // 60s timeout per job (default: 30000) maxMemory: 256, // MB per worker (default: 256) maxRestarts: 10, // Auto-restart limit (default: 10) autoRestart: true, // Auto-restart crashed workers (default: true) pollInterval: 10, // Job poll interval in ms (default: 10)});
worker.start();Processor file (processor.ts):
export default async (job: { id: string; data: any; queue: string; attempts: number; progress: (value: number) => void;}) => { job.progress(50); const result = await heavyComputation(job.data); job.progress(100); return result;};When to Use SandboxedWorker
| Use Case | Worker | SandboxedWorker |
|---|---|---|
| Fast I/O tasks | ✅ | ❌ |
| CPU-intensive | ❌ | ✅ |
| Untrusted code | ❌ | ✅ |
| Memory leak protection | ❌ | ✅ |
| Crash isolation | ❌ | ✅ |
SandboxedWorker API
// Start the worker poolworker.start();
// Get statsconst stats = worker.getStats();// { total: 4, busy: 2, idle: 2, restarts: 0 }
// Graceful shutdownawait worker.stop();