Queue Management
Redis-backed job queues with BullMQ
Installation
bun add @dwex/bullmq bullmqBasic Setup
Configure the BullMQ module with Redis connection:
import { Module } from "@dwex/core";
import { BullMQModule } from "@dwex/bullmq";
@Module({
imports: [
BullMQModule.forRoot({
connection: {
host: "localhost",
port: 6379,
},
}),
],
})
export class AppModule {}Quick Start
1. Register a Queue
Create a module that registers your queues:
import { Module } from "@dwex/core";
import { BullMQModule } from "@dwex/bullmq";
import { EmailService } from "./email.service";
@Module({
imports: [BullMQModule.registerQueue({ name: "emails" })],
providers: [EmailService],
exports: [EmailService],
})
export class EmailModule {}2. Create a Service to Add Jobs
Use getQueueToken() to inject the queue:
import { Injectable, Inject } from "@dwex/core";
import { getQueueToken } from "@dwex/bullmq";
import type { Queue } from "bullmq";
@Injectable()
export class EmailService {
constructor(@Inject(getQueueToken("emails")) private emailQueue: Queue) {}
async sendWelcomeEmail(email: string, name: string) {
const job = await this.emailQueue.add("welcome", {
to: email,
subject: `Welcome ${name}!`,
body: `Hi ${name}, welcome to our application!`,
});
return {
jobId: job.id,
message: "Welcome email queued successfully",
};
}
}3. Create a Processor
Process jobs by extending WorkerHost:
import { Processor, WorkerHost, OnWorkerEvent } from "@dwex/bullmq";
import type { Job } from "bullmq";
interface EmailData {
to: string;
subject: string;
body: string;
}
@Processor("emails", { concurrency: 5 })
export class EmailProcessor extends WorkerHost<EmailData> {
async process(job: Job<EmailData>) {
const { to, subject, body } = job.data;
console.log(`Processing email job ${job.id}`);
// Your email sending logic here
await this.sendEmail(to, subject, body);
return { sent: true, timestamp: Date.now() };
}
@OnWorkerEvent("completed")
onCompleted(job: Job, result: any) {
console.log(`Email sent successfully: ${job.id}`, result);
}
@OnWorkerEvent("failed")
onFailed(job: Job, error: Error) {
console.error(`Email failed: ${job.id}`, error);
}
private async sendEmail(to: string, subject: string, body: string) {
// Your email sending implementation
console.log(`Sending email to ${to}: ${subject}`);
}
}4. Register the Processor
Add the processor to your module:
import { Module } from "@dwex/core";
import { BullMQModule } from "@dwex/bullmq";
import { EmailService } from "./email.service";
import { EmailProcessor } from "./email.processor";
@Module({
imports: [
BullMQModule.registerQueue({ name: "emails" }),
BullMQModule.registerProcessors(EmailProcessor),
],
providers: [EmailService],
exports: [EmailService],
})
export class EmailModule {}Configuration
Global Configuration
Set default options for all queues:
BullMQModule.forRoot({
connection: {
host: "localhost",
port: 6379,
password: process.env.REDIS_PASSWORD,
},
prefix: "myapp", // Prefix for all queue keys
defaultJobOptions: {
attempts: 3,
backoff: {
type: "exponential",
delay: 1000,
},
removeOnComplete: true,
removeOnFail: false,
},
});Async Configuration
Load configuration from a service:
BullMQModule.forRootAsync({
useFactory: (configService: ConfigService) => ({
connection: {
host: configService.get("REDIS_HOST"),
port: configService.get("REDIS_PORT"),
password: configService.get("REDIS_PASSWORD"),
},
}),
inject: [ConfigService],
});Queue-Specific Options
Configure individual queues:
BullMQModule.registerQueue(
{
name: "emails",
options: {
defaultJobOptions: {
attempts: 5,
backoff: {
type: "exponential",
delay: 2000,
},
},
},
},
{
name: "notifications",
options: {
defaultJobOptions: {
attempts: 3,
priority: 10,
},
},
}
);Job Options
Priority
Set job priority (lower numbers = higher priority):
await this.emailQueue.add(
"welcome",
{ to: "user@example.com" },
{ priority: 1 } // High priority
);Delays
Delay job execution:
await this.emailQueue.add(
"reminder",
{ message: "Your trial ends soon" },
{ delay: 24 * 60 * 60 * 1000 } // 24 hours
);Retries and Backoff
Configure retry behavior:
await this.emailQueue.add(
"important",
{ data: "..." },
{
attempts: 5,
backoff: {
type: "exponential",
delay: 1000,
},
}
);Job Removal
Control when jobs are removed:
await this.emailQueue.add(
"data",
{ payload: "..." },
{
removeOnComplete: true, // Remove when completed
removeOnFail: false, // Keep failed jobs for debugging
}
);Worker Events
Handle worker events with decorators:
@Processor("emails")
export class EmailProcessor extends WorkerHost {
async process(job: Job) {
// Process job
return { success: true };
}
@OnWorkerEvent("active")
onActive(job: Job) {
console.log(`Job ${job.id} started`);
}
@OnWorkerEvent("completed")
onCompleted(job: Job, result: any) {
console.log(`Job ${job.id} completed:`, result);
}
@OnWorkerEvent("failed")
onFailed(job: Job, error: Error) {
console.error(`Job ${job.id} failed:`, error.message);
}
@OnWorkerEvent("progress")
onProgress(job: Job, progress: number) {
console.log(`Job ${job.id} progress: ${progress}%`);
}
@OnWorkerEvent("stalled")
onStalled(jobId: string) {
console.warn(`Job ${jobId} stalled`);
}
@OnWorkerEvent("error")
onError(error: Error) {
console.error("Worker error:", error);
}
}Available Events
active- Job starts processingcompleted- Job completes successfullyfailed- Job failsprogress- Job reports progressstalled- Job becomes stalleddrained- Queue is emptyerror- Worker error occursready- Worker is readyclosed- Worker is closedpaused- Worker is pausedresumed- Worker is resumed
Job Progress
Report progress during job processing:
@Processor("video-encoding")
export class VideoProcessor extends WorkerHost {
async process(job: Job) {
await job.updateProgress(0);
// Step 1: Download video
await this.downloadVideo(job.data.url);
await job.updateProgress(25);
// Step 2: Encode video
await this.encodeVideo(job.data);
await job.updateProgress(50);
// Step 3: Upload result
await this.uploadVideo(job.data);
await job.updateProgress(75);
// Step 4: Cleanup
await this.cleanup(job.data);
await job.updateProgress(100);
return { encoded: true };
}
@OnWorkerEvent("progress")
onProgress(job: Job, progress: number) {
console.log(`Video encoding: ${progress}%`);
}
}Queue Statistics
Monitor queue health:
import { Injectable, Inject } from "@dwex/core";
import { getQueueToken } from "@dwex/bullmq";
import type { Queue } from "bullmq";
@Injectable()
export class QueueMonitorService {
constructor(@Inject(getQueueToken("emails")) private queue: Queue) {}
async getStats() {
const [waiting, active, completed, failed] = await Promise.all([
this.queue.getWaitingCount(),
this.queue.getActiveCount(),
this.queue.getCompletedCount(),
this.queue.getFailedCount(),
]);
return {
waiting,
active,
completed,
failed,
total: waiting + active + completed + failed,
};
}
async getJobs() {
return {
waiting: await this.queue.getWaiting(),
active: await this.queue.getActive(),
completed: await this.queue.getCompleted(),
failed: await this.queue.getFailed(),
};
}
}Flow Producers
Create complex job workflows with dependencies:
import { Injectable, Inject } from "@dwex/core";
import { getFlowProducerToken } from "@dwex/bullmq";
import type { FlowProducer } from "bullmq";
@Injectable()
export class UserWorkflowService {
constructor(
@Inject(getFlowProducerToken("user-workflows")) private flow: FlowProducer
) {}
async createUserWorkflow(userId: string) {
await this.flow.add({
name: "create-user",
queueName: "users",
data: { userId },
children: [
{
name: "send-welcome-email",
queueName: "emails",
data: { userId },
},
{
name: "setup-profile",
queueName: "profiles",
data: { userId },
},
],
});
}
}Register the flow producer:
@Module({
imports: [
BullMQModule.registerFlowProducer({
name: "user-workflows",
}),
],
})
export class UserModule {}Best Practices
1. Use Appropriate Concurrency
// High concurrency for lightweight tasks
@Processor("notifications", { concurrency: 50 })
export class NotificationProcessor extends WorkerHost {
// ...
}
// Low concurrency for heavy tasks
@Processor("video-processing", { concurrency: 2 })
export class VideoProcessor extends WorkerHost {
// ...
}2. Handle Job Failures Gracefully
@Processor("payments")
export class PaymentProcessor extends WorkerHost {
async process(job: Job) {
try {
await this.processPayment(job.data);
return { success: true };
} catch (error) {
// Log the error
console.error(`Payment failed for job ${job.id}:`, error);
// Throw to trigger retry
throw error;
}
}
@OnWorkerEvent("failed")
async onFailed(job: Job, error: Error) {
// Alert team if all retries exhausted
if (job.attemptsMade >= job.opts.attempts) {
await this.alertTeam(`Payment job ${job.id} failed permanently`);
}
}
}3. Clean Up Old Jobs
BullMQModule.forRoot({
connection: { host: "localhost", port: 6379 },
defaultJobOptions: {
removeOnComplete: {
age: 24 * 3600, // Keep completed jobs for 24 hours
count: 1000, // Keep max 1000 completed jobs
},
removeOnFail: {
age: 7 * 24 * 3600, // Keep failed jobs for 7 days
},
},
});4. Use Job IDs for Deduplication
await this.emailQueue.add(
"welcome",
{ to: "user@example.com" },
{
jobId: `welcome-${userId}`, // Prevent duplicate jobs
removeOnComplete: true,
}
);5. Monitor Queue Health
@Injectable()
export class QueueHealthService {
constructor(@Inject(getQueueToken("emails")) private queue: Queue) {}
async checkHealth() {
const failed = await this.queue.getFailedCount();
const active = await this.queue.getActiveCount();
// Alert if too many failures
if (failed > 100) {
await this.alertTeam(`High failure rate: ${failed} failed jobs`);
}
// Alert if queue is stuck
if (active === 0 && (await this.queue.getWaitingCount()) > 50) {
await this.alertTeam("Queue appears to be stuck");
}
}
}Example: Complete Email Queue System
// email.module.ts
import { Module } from "@dwex/core";
import { BullMQModule } from "@dwex/bullmq";
import { EmailService } from "./email.service";
import { EmailProcessor } from "./email.processor";
@Module({
imports: [
BullMQModule.registerQueue({ name: "emails" }),
BullMQModule.registerProcessors(EmailProcessor),
],
providers: [EmailService],
exports: [EmailService],
})
export class EmailModule {}
// email.service.ts
import { Injectable, Inject } from "@dwex/core";
import { getQueueToken } from "@dwex/bullmq";
import type { Queue } from "bullmq";
@Injectable()
export class EmailService {
constructor(@Inject(getQueueToken("emails")) private emailQueue: Queue) {}
async sendWelcomeEmail(email: string, name: string) {
return await this.emailQueue.add("welcome", {
to: email,
subject: `Welcome ${name}!`,
body: `Hi ${name}, welcome to our application!`,
});
}
async sendNotification(email: string, message: string) {
return await this.emailQueue.add("notification", {
to: email,
subject: "Notification",
body: message,
});
}
async getQueueStats() {
const [waiting, active, completed, failed] = await Promise.all([
this.emailQueue.getWaitingCount(),
this.emailQueue.getActiveCount(),
this.emailQueue.getCompletedCount(),
this.emailQueue.getFailedCount(),
]);
return { waiting, active, completed, failed };
}
}
// email.processor.ts
import { Processor, WorkerHost, OnWorkerEvent } from "@dwex/bullmq";
import type { Job } from "bullmq";
interface EmailData {
to: string;
subject: string;
body: string;
}
@Processor("emails", { concurrency: 5 })
export class EmailProcessor extends WorkerHost<EmailData> {
async process(job: Job<EmailData>): Promise<{ sent: boolean }> {
const { to, subject, body } = job.data;
console.log(`Processing email job ${job.id}`);
// Simulate email sending
await new Promise((resolve) => setTimeout(resolve, 1000));
return { sent: true };
}
@OnWorkerEvent("completed")
onCompleted(job: Job, result: any) {
console.log(`Email sent successfully: ${job.id}`, result);
}
@OnWorkerEvent("failed")
onFailed(job: Job, error: Error) {
console.error(`Email failed: ${job.id}`, error);
}
}