Bull MQ
Introduction

Redis is used to store the history of execution history of job, the detail and configuration of job, job will be distinguished by using job id.
Break up monolithic tasks that may otherwise block the Node.js event loop. For example, if a user request requires CPU intensive work like audio transcoding, you can delegate this task to other processes, freeing up user-facing processes to remain responsive.
It is using publisher-subscriber mechanism, The producer is to create a job with unique id and data and transfer through the queue. The consumer is to subscribe to a queue in order to receive the job and execute the logic.
Producer
Job producers add jobs to queues.
import { Injectable } from '@nestjs/common';
import { Queue } from 'bullmq';
import { InjectQueue } from '@nestjs/bullmq';
@Injectable()
export class TestService {
constructor(@InjectQueue('test') private testQueue: Queue) {}
async test(){
// simple job
await this.testQueue.add('immediateJob', { test: 1 },
{ attempts: 2, jobId: 'jobId1' }
);
// repeatable job
await this.testQueue.add(
'schedule job',
{ test: 1 },
{ repeat: { every: 2000, limit: 2 }, jobId: 'repeatJob' },
);
// flow
await flowProducer.add({
name: 'flow',
data: { step: 'flow' },
queueName: 'test',
children: [
{
name: 'car',
data: { step: 'wheels' },
queueName: 'test',
children: [{ name: 'car', data: { step: 'chassis' }, queueName: 'test' }],
},
],
});
return '123';
}
}
Consumer
A consumer is a class defining methods that either process jobs added into the queue,
It is also for listen for events on the queue
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';
// define the rate limit of processing job
@Processor('test', {
concurrency: 4,
limiter: {
max: 50,
duration: 2000,
},
})
export class InsightProcessor extends WorkerHost {
private readonly logger = new Logger(InsightProcessor.name);
// The core logic of processing the job from queue
async process(job: Job<any, any, string>): Promise<any> {
console.log('job', job.id, job.data);
return 'done';
}
// to listen the state of the job
@OnWorkerEvent('active')
onActive(job: Job) {
console.log(
`Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
);
}
}Lifecycle of job

When a job is added to a queue it can be in one of three states:
“wait”: a waiting list, where all jobs must enter before they can be processed.
“prioritized”: implies that a job with higher priority will be processed first.
“delayed”: implies that the job is waiting for some timeout or to be promoted for being processed. These jobs are not processed directly, but instead are placed at the beginning of the waiting list, or in a prioritized set, and processed as soon as a worker is idle.
The next state for a job is the “active” state. The active state is represented by a set, and are jobs that are currently being processed
the job will end in either the “completed” or the “failed” status.
Redis Data

Redis acts as a persist data store to store the job status, job detail, etc....

The detail of job, which key is the unique job id

The repeatable job configuration and its related job executed (job ids are generated in random string)



The completed job list, the history of execution of job and the repeatable configuration list
Bull Dashboard

To visualise the job datas with redis data
npm install --save @bull-board/nestjs @bull-board/api@Module({
imports: [
LoggerModule.forRoot({
pinoHttp: pinoOptionConfig(),
}),
// connect redis with bullmq
BullModule.forRoot({
connection: {
host: 'localhost',
port: 6379,
},
}),
// init bull board
BullBoardModule.forRoot({
route: '/queues',
adapter: ExpressAdapter,
}),
InsightModule,
],
providers: [
{
provide: APP_FILTER,
useClass: HttpExceptionFilter,
},
],
})
export class AppModule implements NestModule {
public configure(consumer: MiddlewareConsumer): void {
consumer.apply(InjectTraceIdMiddleware).forRoutes('*');
}
}import { Module } from '@nestjs/common';
import { PrismaModule } from '../prisma/prisma.module';
import { InsightController } from './insight.controller';
import { InsightService } from './insight.service';
import { SearchEngineModule } from '../searchEngine/searchEngine.module';
import { WebScraperModule } from '../webScraper/webScraper.module';
import { BullModule } from '@nestjs/bullmq';
import { InsightGenerationProcessor } from './insightGeneration.process';
import { insightOverallProcessor } from './insightOverall.process';
import { insightArticleProcessor } from './insightArticle.process';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { BullBoardModule } from '@bull-board/nestjs';
@Module({
imports: [
BullModule.registerQueue({
name: 'insightGeneration',
}),
BullModule.registerQueue({
name: 'insightArticle',
}),
BullModule.registerQueue({
name: 'insightOverall',
}),
BullModule.registerFlowProducer({
name: 'insightFlow',
}),
BullBoardModule.forFeature({
name: 'insightGeneration',
adapter: BullMQAdapter,
}),
BullBoardModule.forFeature({
name: 'insightArticle',
adapter: BullMQAdapter,
}),
BullBoardModule.forFeature({
name: 'insightOverall',
adapter: BullMQAdapter,
}),
SearchEngineModule,
WebScraperModule,
PrismaModule,
],
controllers: [InsightController],
providers: [
InsightService,
InsightGenerationProcessor,
insightOverallProcessor,
insightArticleProcessor,
],
})
export class InsightModule {}
Concurrency vs Parallelism
Concurrency in BullMQ
Definition: The ability to make progress on multiple jobs at the same time by switching between them, rather than completing one before starting another.
Mechanism: It's achieved by using Node.js's non-blocking, event-driven architecture. A worker can initiate an I/O operation (like a database query) and then move on to process another job without waiting for the first one to complete.
Local Concurrency: The
concurrencyoption in aWorkerinstance determines how many jobs that single worker instance can process concurrently. For example,concurrency: 5means one worker can handle five jobs at the same time, switching between them during I/O waits.Use case: Ideal for I/O-bound jobs, such as API calls or database interactions, as it allows for high throughput without blocking the main thread
Parallelism in BullMQ
Definition: The actual, simultaneous execution of multiple computations at the same time.
Mechanism: This is achieved by running multiple worker instances. If you run several workers on a multi-core processor, they can run their jobs at the exact same time, independently.
Global Concurrency: BullMQ also supports a
globalConcurrencysetting on the queue, which sets a maximum limit on the total number of jobs processed across all workers at any given time.Use case: Necessary for CPU-intensive tasks that need true parallel execution, which is achieved by using multiple worker processes or worker threads.
How to choose
For I/O-bound jobs: Increase the
concurrencyvalue for a single worker to take advantage of Node.js's event loop.For CPU-bound jobs: Run multiple worker processes/threads to achieve parallelism. You can also use Sandboxed Processors for more isolated execution.
Pros & Cons
Advantage
Atomic operation with redis , prevent from racing condition among all the workers, to ensure only 1 worker receive the job.
Rich-featured, Flow with child jobs, Retry, Rate limit, have good lifecycle management
Easy to adapt in nodejs , make good use of nodejs event loop for concurrency, build in nestjs decorators
Disadvantage
Redis is not suitable for persistence storage, large data set issue, may need to integrate with SQL database for long term storage for hybrid approach (For example: Redis for storing id and SQL database for storing the payload)
For infra level, redis is needed to be managed well, for example: sharding for data horizontal scaling, read replica for failure tolerance.
Last updated
Was this helpful?