Cancelling jobs
Source URL: https://docs.bullmq.io/guide/workers/cancelling-jobs
Cancelling jobs
Section titled “Cancelling jobs”The job cancellation feature allows you to gracefully cancel jobs that are currently being processed by a worker. This is implemented using the standard AbortController and AbortSignal APIs.
How It Works
Section titled “How It Works”When a worker processes a job, it can receive an optional AbortSignal as the third parameter in the processor function. This signal can be used to detect when a job has been cancelled and perform cleanup operations.
import { Worker } from 'bullmq';
const worker = new Worker('myQueue', async (job, token, signal) => { // The signal parameter is optional and provides cancellation support // Your job processing logic here});Cancelling Jobs
Section titled “Cancelling Jobs”The Worker class provides methods to cancel jobs:
// Cancel a specific job by IDconst cancelled = worker.cancelJob('job-id-123');console.log('Job cancelled:', cancelled); // true if job was active, false otherwise
// Cancel with a reason (useful for debugging)worker.cancelJob('job-id-456', 'User requested cancellation');
// Cancel all active jobsworker.cancelAllJobs();
// Cancel all with a reasonworker.cancelAllJobs('System shutdown');
// Get list of active jobs from queueconst activeJobs = await queue.getActive();console.log( 'Active jobs:', activeJobs.map(j => j.id),);Cancellation Reasons
Section titled “Cancellation Reasons”When you provide a cancellation reason, it’s passed to the AbortController.abort(reason) method and can be accessed via signal.reason:
const worker = new Worker('myQueue', async (job, token, signal) => { return new Promise((resolve, reject) => { signal?.addEventListener('abort', () => { // Access the cancellation reason const reason = signal.reason || 'No reason provided'; console.log(`Job ${job.id} cancelled: ${reason}`);
reject(new Error(`Cancelled: ${reason}`)); });
// Your processing logic });});
// Later, cancel with a descriptive reasonworker.cancelJob(job.id, 'Resource limit exceeded');Handling Cancellation (Recommended Pattern)
Section titled “Handling Cancellation (Recommended Pattern)”The event-based approach is the recommended pattern as it provides immediate response to cancellation:
import { Worker } from 'bullmq';
const worker = new Worker('myQueue', async (job, token, signal) => { return new Promise((resolve, reject) => { // Listen for abort event signal?.addEventListener('abort', () => { console.log(`Job ${job.id} cancellation requested`);
// Clean up resources clearInterval(interval);
// Reject with error reject(new Error('Job was cancelled')); });
// Your processing logic const interval = setInterval(() => { // Do work processNextItem(); }, 100); });});Why Event-Based?
Section titled “Why Event-Based?”- ✅ Immediate response - No polling delay
- ✅ More efficient - No CPU wasted checking in loops
- ✅ Cleaner code - Separation of concerns
- ✅ Standard pattern - Matches Web APIs like
fetch()
Using with Native APIs (Recommended)
Section titled “Using with Native APIs (Recommended)”Many Web APIs natively support AbortSignal. The signal is composable - you can pass it to APIs and still listen to it yourself:
const worker = new Worker('fetchQueue', async (job, token, signal) => { return new Promise(async (resolve, reject) => { // Set up abort listener - handles cancellation for the job signal?.addEventListener('abort', () => { reject(new Error('Job was cancelled')); });
// Pass the SAME signal to fetch - it will abort the network request const response = await fetch(job.data.url, { signal, // ✅ Cancels the HTTP request at network level method: 'GET', headers: job.data.headers, });
const data = await response.json(); resolve(data); });});Why this pattern is better:
- ✅ Simpler - One abort listener handles everything
- ✅ Composable - Signal passed to
fetch()AND listened to in job - ✅ The HTTP request is truly cancelled at the network level
- ✅ The job is properly marked as failed when cancelled
- ✅ No complex error checking needed
APIs That Support AbortSignal
Section titled “APIs That Support AbortSignal”Many modern APIs accept signal directly:
fetch(url, { signal })- HTTP requestsaddEventListener(event, handler, { signal })- Auto-removes listener on abort- Many database clients (Postgres, MongoDB drivers)
- File system operations in newer Node.js APIs
Cancelling Custom Operations
Section titled “Cancelling Custom Operations”For operations that don’t natively support AbortSignal, implement proper cleanup:
const worker = new Worker('customQueue', async (job, token, signal) => { // Start your operation const operation = startLongRunningOperation(job.data);
// Set up cancellation handler that actually stops the operation signal?.addEventListener('abort', () => { operation.cancel(); // ✅ Actually stops the work });
try { const result = await operation.promise; return result; } catch (error) { if (signal?.aborted) { throw new Error('Operation cancelled'); } throw error; }});Async Cleanup on Cancellation
Section titled “Async Cleanup on Cancellation”Perform cleanup operations before rejecting the promise:
const worker = new Worker('dbQueue', async (job, token, signal) => { // Acquire resources const db = await connectToDatabase(); const cache = await connectToCache();
return new Promise(async (resolve, reject) => { // Set up cleanup handler signal?.addEventListener('abort', async () => { try { console.log('Cleaning up resources...');
// Close connections gracefully await db.close(); await cache.disconnect();
console.log('Cleanup complete'); reject(new Error('Cancelled after cleanup')); } catch (cleanupError) { console.error('Cleanup failed:', cleanupError); reject(new Error('Cleanup failed during cancellation')); } });
try { // Do your work const result = await processWithDatabase(db, job.data); await cache.set(`job:${job.id}`, result); resolve(result); } catch (error) { // Cleanup on error too await db.close(); await cache.disconnect(); throw error; } });});Alternative: Polling Pattern
Section titled “Alternative: Polling Pattern”You can also check signal.aborted periodically (less efficient but simpler for some use cases):
const worker = new Worker('batchQueue', async (job, token, signal) => { const items = job.data.items; const results = [];
for (let i = 0; i < items.length; i++) { // Check if job has been cancelled if (signal?.aborted) { throw new Error(`Cancelled after processing ${i} items`); }
const result = await processItem(items[i]); results.push(result);
// Update progress await job.updateProgress(((i + 1) / items.length) * 100); }
return { results, total: results.length };});Job State After Cancellation
Section titled “Job State After Cancellation”With Regular Error (Will Retry)
Section titled “With Regular Error (Will Retry)”When you throw a regular Error upon cancellation:
- Job state: Moves to
failed - Retries: Job WILL be retried if
attemptsremain - Use case: When you want the job to be retried later
const worker = new Worker('retryQueue', async (job, token, signal) => { return new Promise((resolve, reject) => { signal?.addEventListener('abort', () => { // Regular Error - job will retry if attempts remain reject(new Error('Cancelled, will retry')); });
// Your work... });});
// Set attempts when adding jobsawait queue.add('task', data, { attempts: 3 });With UnrecoverableError (No Retry)
Section titled “With UnrecoverableError (No Retry)”When you throw an UnrecoverableError:
- Job state: Moves to
failed - Retries: Job will NOT be retried
- Use case: When cancellation should be permanent
import { Worker, UnrecoverableError } from 'bullmq';
const worker = new Worker('noRetryQueue', async (job, token, signal) => { return new Promise((resolve, reject) => { signal?.addEventListener('abort', () => { // UnrecoverableError - no retries reject(new UnrecoverableError('Cancelled permanently')); });
// Your work... });});Handling Lock Renewal Failures
Section titled “Handling Lock Renewal Failures”When a worker loses its lock on a job (due to network issues, Redis problems, or long-running operations), you can gracefully handle this situation using the lockRenewalFailed event:
const worker = new Worker( 'myQueue', async (job, token, signal) => { return new Promise(async (resolve, reject) => { signal?.addEventListener('abort', async () => { console.log('Job cancelled - cleaning up resources'); await cleanupResources(); reject(new Error('Job cancelled')); });
// Your work... }); }, { connection },);
// Cancel jobs when lock renewal failsworker.on('lockRenewalFailed', (jobIds: string[]) => { console.log('Lock renewal failed for jobs:', jobIds); jobIds.forEach(jobId => worker.cancelJob(jobId));});{% hint style=“warning” %}
Important: When a worker loses the lock on a job, it cannot move that job to the failed state (as it no longer owns the lock). Instead:
- The
cancelJob()aborts the signal, allowing the processor to clean up resources - The job remains in
activestate temporarily - BullMQ’s stalled job checker will detect the job and move it back to
waiting - Another worker (or the same worker) will pick it up and retry
This is the correct and intended behavior - trust BullMQ’s stalled job mechanism to handle lost locks. {% endhint %}
Why This Pattern Works
Section titled “Why This Pattern Works”- ✅ Immediate cleanup: The processor detects
signal.abortedand can release resources - ✅ No wasted work: The processor stops processing when it loses the lock
- ✅ Automatic recovery: The stalled job checker moves the job back to waiting
- ✅ No data loss: The job will be retried according to its
attemptssetting - ✅ Works with existing infrastructure: Uses BullMQ’s built-in stalled job handling
Multi-Phase Work with Cancellation
Section titled “Multi-Phase Work with Cancellation”Check cancellation at strategic points in multi-phase operations:
const worker = new Worker('multiPhaseQueue', async (job, token, signal) => { return new Promise(async (resolve, reject) => { signal?.addEventListener('abort', () => { reject(new Error('Cancelled')); });
try { // Phase 1: Download if (signal?.aborted) throw new Error('Cancelled before download'); const data = await downloadData(job.data.url); await job.updateProgress(33);
// Phase 2: Process if (signal?.aborted) throw new Error('Cancelled before processing'); const processed = await processData(data); await job.updateProgress(66);
// Phase 3: Upload if (signal?.aborted) throw new Error('Cancelled before upload'); const result = await uploadResults(processed); await job.updateProgress(100);
resolve(result); } catch (error) { reject(error); } });});Backward Compatibility
Section titled “Backward Compatibility”The signal parameter is optional. Existing processors that don’t use it will continue to work normally:
// Old processor - still worksconst worker = new Worker('myQueue', async job => { return await processJob(job);});
// New processor - with cancellation supportconst worker = new Worker('myQueue', async (job, token, signal) => { // Can now handle cancellation});{% hint style=“info” %} The cancellation feature is fully backward compatible. You only need to add signal handling when you want cancellation support. {% endhint %}
Best Practices
Section titled “Best Practices”- Use event-based cancellation for immediate response
- Clean up resources in the abort handler
- Use UnrecoverableError when cancellation should be permanent
- Combine with timeouts for better control
- Check
signal.abortedat strategic points in long operations - Handle cleanup errors gracefully to avoid leaving resources open