Dwex Logo

Queue Management

Redis-backed job queues with BullMQ

Installation

bun add @dwex/bullmq bullmq

Basic Setup

Configure the BullMQ module with Redis connection:

import { Module } from "@dwex/core";
import { BullMQModule } from "@dwex/bullmq";

@Module({
	imports: [
		BullMQModule.forRoot({
			connection: {
				host: "localhost",
				port: 6379,
			},
		}),
	],
})
export class AppModule {}

Quick Start

1. Register a Queue

Create a module that registers your queues:

import { Module } from "@dwex/core";
import { BullMQModule } from "@dwex/bullmq";
import { EmailService } from "./email.service";

@Module({
	imports: [BullMQModule.registerQueue({ name: "emails" })],
	providers: [EmailService],
	exports: [EmailService],
})
export class EmailModule {}

2. Create a Service to Add Jobs

Use getQueueToken() to inject the queue:

import { Injectable, Inject } from "@dwex/core";
import { getQueueToken } from "@dwex/bullmq";
import type { Queue } from "bullmq";

@Injectable()
export class EmailService {
	constructor(@Inject(getQueueToken("emails")) private emailQueue: Queue) {}

	async sendWelcomeEmail(email: string, name: string) {
		const job = await this.emailQueue.add("welcome", {
			to: email,
			subject: `Welcome ${name}!`,
			body: `Hi ${name}, welcome to our application!`,
		});

		return {
			jobId: job.id,
			message: "Welcome email queued successfully",
		};
	}
}

3. Create a Processor

Process jobs by extending WorkerHost:

import { Processor, WorkerHost, OnWorkerEvent } from "@dwex/bullmq";
import type { Job } from "bullmq";

interface EmailData {
	to: string;
	subject: string;
	body: string;
}

@Processor("emails", { concurrency: 5 })
export class EmailProcessor extends WorkerHost<EmailData> {
	async process(job: Job<EmailData>) {
		const { to, subject, body } = job.data;

		console.log(`Processing email job ${job.id}`);
		// Your email sending logic here
		await this.sendEmail(to, subject, body);

		return { sent: true, timestamp: Date.now() };
	}

	@OnWorkerEvent("completed")
	onCompleted(job: Job, result: any) {
		console.log(`Email sent successfully: ${job.id}`, result);
	}

	@OnWorkerEvent("failed")
	onFailed(job: Job, error: Error) {
		console.error(`Email failed: ${job.id}`, error);
	}

	private async sendEmail(to: string, subject: string, body: string) {
		// Your email sending implementation
		console.log(`Sending email to ${to}: ${subject}`);
	}
}

4. Register the Processor

Add the processor to your module:

import { Module } from "@dwex/core";
import { BullMQModule } from "@dwex/bullmq";
import { EmailService } from "./email.service";
import { EmailProcessor } from "./email.processor";

@Module({
	imports: [
		BullMQModule.registerQueue({ name: "emails" }),
		BullMQModule.registerProcessors(EmailProcessor),
	],
	providers: [EmailService],
	exports: [EmailService],
})
export class EmailModule {}

Configuration

Global Configuration

Set default options for all queues:

BullMQModule.forRoot({
	connection: {
		host: "localhost",
		port: 6379,
		password: process.env.REDIS_PASSWORD,
	},
	prefix: "myapp", // Prefix for all queue keys
	defaultJobOptions: {
		attempts: 3,
		backoff: {
			type: "exponential",
			delay: 1000,
		},
		removeOnComplete: true,
		removeOnFail: false,
	},
});

Async Configuration

Load configuration from a service:

BullMQModule.forRootAsync({
	useFactory: (configService: ConfigService) => ({
		connection: {
			host: configService.get("REDIS_HOST"),
			port: configService.get("REDIS_PORT"),
			password: configService.get("REDIS_PASSWORD"),
		},
	}),
	inject: [ConfigService],
});

Queue-Specific Options

Configure individual queues:

BullMQModule.registerQueue(
	{
		name: "emails",
		options: {
			defaultJobOptions: {
				attempts: 5,
				backoff: {
					type: "exponential",
					delay: 2000,
				},
			},
		},
	},
	{
		name: "notifications",
		options: {
			defaultJobOptions: {
				attempts: 3,
				priority: 10,
			},
		},
	}
);

Job Options

Priority

Set job priority (lower numbers = higher priority):

await this.emailQueue.add(
	"welcome",
	{ to: "user@example.com" },
	{ priority: 1 } // High priority
);

Delays

Delay job execution:

await this.emailQueue.add(
	"reminder",
	{ message: "Your trial ends soon" },
	{ delay: 24 * 60 * 60 * 1000 } // 24 hours
);

Retries and Backoff

Configure retry behavior:

await this.emailQueue.add(
	"important",
	{ data: "..." },
	{
		attempts: 5,
		backoff: {
			type: "exponential",
			delay: 1000,
		},
	}
);

Job Removal

Control when jobs are removed:

await this.emailQueue.add(
	"data",
	{ payload: "..." },
	{
		removeOnComplete: true, // Remove when completed
		removeOnFail: false, // Keep failed jobs for debugging
	}
);

Worker Events

Handle worker events with decorators:

@Processor("emails")
export class EmailProcessor extends WorkerHost {
	async process(job: Job) {
		// Process job
		return { success: true };
	}

	@OnWorkerEvent("active")
	onActive(job: Job) {
		console.log(`Job ${job.id} started`);
	}

	@OnWorkerEvent("completed")
	onCompleted(job: Job, result: any) {
		console.log(`Job ${job.id} completed:`, result);
	}

	@OnWorkerEvent("failed")
	onFailed(job: Job, error: Error) {
		console.error(`Job ${job.id} failed:`, error.message);
	}

	@OnWorkerEvent("progress")
	onProgress(job: Job, progress: number) {
		console.log(`Job ${job.id} progress: ${progress}%`);
	}

	@OnWorkerEvent("stalled")
	onStalled(jobId: string) {
		console.warn(`Job ${jobId} stalled`);
	}

	@OnWorkerEvent("error")
	onError(error: Error) {
		console.error("Worker error:", error);
	}
}

Available Events

  • active - Job starts processing
  • completed - Job completes successfully
  • failed - Job fails
  • progress - Job reports progress
  • stalled - Job becomes stalled
  • drained - Queue is empty
  • error - Worker error occurs
  • ready - Worker is ready
  • closed - Worker is closed
  • paused - Worker is paused
  • resumed - Worker is resumed

Job Progress

Report progress during job processing:

@Processor("video-encoding")
export class VideoProcessor extends WorkerHost {
	async process(job: Job) {
		await job.updateProgress(0);

		// Step 1: Download video
		await this.downloadVideo(job.data.url);
		await job.updateProgress(25);

		// Step 2: Encode video
		await this.encodeVideo(job.data);
		await job.updateProgress(50);

		// Step 3: Upload result
		await this.uploadVideo(job.data);
		await job.updateProgress(75);

		// Step 4: Cleanup
		await this.cleanup(job.data);
		await job.updateProgress(100);

		return { encoded: true };
	}

	@OnWorkerEvent("progress")
	onProgress(job: Job, progress: number) {
		console.log(`Video encoding: ${progress}%`);
	}
}

Queue Statistics

Monitor queue health:

import { Injectable, Inject } from "@dwex/core";
import { getQueueToken } from "@dwex/bullmq";
import type { Queue } from "bullmq";

@Injectable()
export class QueueMonitorService {
	constructor(@Inject(getQueueToken("emails")) private queue: Queue) {}

	async getStats() {
		const [waiting, active, completed, failed] = await Promise.all([
			this.queue.getWaitingCount(),
			this.queue.getActiveCount(),
			this.queue.getCompletedCount(),
			this.queue.getFailedCount(),
		]);

		return {
			waiting,
			active,
			completed,
			failed,
			total: waiting + active + completed + failed,
		};
	}

	async getJobs() {
		return {
			waiting: await this.queue.getWaiting(),
			active: await this.queue.getActive(),
			completed: await this.queue.getCompleted(),
			failed: await this.queue.getFailed(),
		};
	}
}

Flow Producers

Create complex job workflows with dependencies:

import { Injectable, Inject } from "@dwex/core";
import { getFlowProducerToken } from "@dwex/bullmq";
import type { FlowProducer } from "bullmq";

@Injectable()
export class UserWorkflowService {
	constructor(
		@Inject(getFlowProducerToken("user-workflows")) private flow: FlowProducer
	) {}

	async createUserWorkflow(userId: string) {
		await this.flow.add({
			name: "create-user",
			queueName: "users",
			data: { userId },
			children: [
				{
					name: "send-welcome-email",
					queueName: "emails",
					data: { userId },
				},
				{
					name: "setup-profile",
					queueName: "profiles",
					data: { userId },
				},
			],
		});
	}
}

Register the flow producer:

@Module({
	imports: [
		BullMQModule.registerFlowProducer({
			name: "user-workflows",
		}),
	],
})
export class UserModule {}

Best Practices

1. Use Appropriate Concurrency

// High concurrency for lightweight tasks
@Processor("notifications", { concurrency: 50 })
export class NotificationProcessor extends WorkerHost {
	// ...
}

// Low concurrency for heavy tasks
@Processor("video-processing", { concurrency: 2 })
export class VideoProcessor extends WorkerHost {
	// ...
}

2. Handle Job Failures Gracefully

@Processor("payments")
export class PaymentProcessor extends WorkerHost {
	async process(job: Job) {
		try {
			await this.processPayment(job.data);
			return { success: true };
		} catch (error) {
			// Log the error
			console.error(`Payment failed for job ${job.id}:`, error);

			// Throw to trigger retry
			throw error;
		}
	}

	@OnWorkerEvent("failed")
	async onFailed(job: Job, error: Error) {
		// Alert team if all retries exhausted
		if (job.attemptsMade >= job.opts.attempts) {
			await this.alertTeam(`Payment job ${job.id} failed permanently`);
		}
	}
}

3. Clean Up Old Jobs

BullMQModule.forRoot({
	connection: { host: "localhost", port: 6379 },
	defaultJobOptions: {
		removeOnComplete: {
			age: 24 * 3600, // Keep completed jobs for 24 hours
			count: 1000, // Keep max 1000 completed jobs
		},
		removeOnFail: {
			age: 7 * 24 * 3600, // Keep failed jobs for 7 days
		},
	},
});

4. Use Job IDs for Deduplication

await this.emailQueue.add(
	"welcome",
	{ to: "user@example.com" },
	{
		jobId: `welcome-${userId}`, // Prevent duplicate jobs
		removeOnComplete: true,
	}
);

5. Monitor Queue Health

@Injectable()
export class QueueHealthService {
	constructor(@Inject(getQueueToken("emails")) private queue: Queue) {}

	async checkHealth() {
		const failed = await this.queue.getFailedCount();
		const active = await this.queue.getActiveCount();

		// Alert if too many failures
		if (failed > 100) {
			await this.alertTeam(`High failure rate: ${failed} failed jobs`);
		}

		// Alert if queue is stuck
		if (active === 0 && (await this.queue.getWaitingCount()) > 50) {
			await this.alertTeam("Queue appears to be stuck");
		}
	}
}

Example: Complete Email Queue System

// email.module.ts
import { Module } from "@dwex/core";
import { BullMQModule } from "@dwex/bullmq";
import { EmailService } from "./email.service";
import { EmailProcessor } from "./email.processor";

@Module({
	imports: [
		BullMQModule.registerQueue({ name: "emails" }),
		BullMQModule.registerProcessors(EmailProcessor),
	],
	providers: [EmailService],
	exports: [EmailService],
})
export class EmailModule {}

// email.service.ts
import { Injectable, Inject } from "@dwex/core";
import { getQueueToken } from "@dwex/bullmq";
import type { Queue } from "bullmq";

@Injectable()
export class EmailService {
	constructor(@Inject(getQueueToken("emails")) private emailQueue: Queue) {}

	async sendWelcomeEmail(email: string, name: string) {
		return await this.emailQueue.add("welcome", {
			to: email,
			subject: `Welcome ${name}!`,
			body: `Hi ${name}, welcome to our application!`,
		});
	}

	async sendNotification(email: string, message: string) {
		return await this.emailQueue.add("notification", {
			to: email,
			subject: "Notification",
			body: message,
		});
	}

	async getQueueStats() {
		const [waiting, active, completed, failed] = await Promise.all([
			this.emailQueue.getWaitingCount(),
			this.emailQueue.getActiveCount(),
			this.emailQueue.getCompletedCount(),
			this.emailQueue.getFailedCount(),
		]);

		return { waiting, active, completed, failed };
	}
}

// email.processor.ts
import { Processor, WorkerHost, OnWorkerEvent } from "@dwex/bullmq";
import type { Job } from "bullmq";

interface EmailData {
	to: string;
	subject: string;
	body: string;
}

@Processor("emails", { concurrency: 5 })
export class EmailProcessor extends WorkerHost<EmailData> {
	async process(job: Job<EmailData>): Promise<{ sent: boolean }> {
		const { to, subject, body } = job.data;

		console.log(`Processing email job ${job.id}`);

		// Simulate email sending
		await new Promise((resolve) => setTimeout(resolve, 1000));

		return { sent: true };
	}

	@OnWorkerEvent("completed")
	onCompleted(job: Job, result: any) {
		console.log(`Email sent successfully: ${job.id}`, result);
	}

	@OnWorkerEvent("failed")
	onFailed(job: Job, error: Error) {
		console.error(`Email failed: ${job.id}`, error);
	}
}

Next Steps