Bull MQ

Introduction

  • BullMQ is a Node.js library that implements a fast and robust queue system built on top of Redis that helps in resolving many modern age micro-services architectures.

  • Redis is used to store the history of execution history of job, the detail and configuration of job, job will be distinguished by using job id.

  • Break up monolithic tasks that may otherwise block the Node.js event loop. For example, if a user request requires CPU intensive work like audio transcoding, you can delegate this task to other processes, freeing up user-facing processes to remain responsive.

  • It is using publisher-subscriber mechanism, The producer is to create a job with unique id and data and transfer through the queue. The consumer is to subscribe to a queue in order to receive the job and execute the logic.

Producer

  • Job producers add jobs to queues.

import { Injectable } from '@nestjs/common';
import { Queue } from 'bullmq';
import { InjectQueue } from '@nestjs/bullmq';

@Injectable()
export class TestService {
  constructor(@InjectQueue('test') private testQueue: Queue) {}
  async test(){
    // simple job
    await this.testQueue.add('immediateJob', { test: 1 }, 
     { attempts: 2, jobId: 'jobId1' }
    );
    // repeatable job
    await this.testQueue.add(
       'schedule job',
       { test: 1 },
       { repeat: { every: 2000, limit: 2 }, jobId: 'repeatJob' },
    );
    // flow
    await flowProducer.add({
      name: 'flow',
      data: { step: 'flow' },
      queueName: 'test',
      children: [
        {
          name: 'car',
          data: { step: 'wheels' },
          queueName: 'test',
          children: [{ name: 'car', data: { step: 'chassis' }, queueName: 'test' }],
        },
      ],
    });    
    return '123';
  }
}

Consumer

  • A consumer is a class defining methods that either process jobs added into the queue,

  • It is also for listen for events on the queue

import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';
// define the rate limit of processing job
@Processor('test', {
  concurrency: 4,
  limiter: {
    max: 50,
    duration: 2000,
  },
})
export class InsightProcessor extends WorkerHost {
  private readonly logger = new Logger(InsightProcessor.name);
  // The core logic of processing the job from queue
  async process(job: Job<any, any, string>): Promise<any> {
    console.log('job', job.id, job.data);
    return 'done';
  }
  // to listen the state of the job
  @OnWorkerEvent('active')
  onActive(job: Job) {
    console.log(
      `Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
    );
  }
}

Lifecycle of job

When a job is added to a queue it can be in one of three states:

  • “wait”: a waiting list, where all jobs must enter before they can be processed.

  • “prioritized”: implies that a job with higher priority will be processed first.

  • “delayed”: implies that the job is waiting for some timeout or to be promoted for being processed. These jobs are not processed directly, but instead are placed at the beginning of the waiting list, or in a prioritized set, and processed as soon as a worker is idle.

The next state for a job is the “active” state. The active state is represented by a set, and are jobs that are currently being processed

the job will end in either the “completed” or the “failed” status.

Redis Data

  • Redis acts as a persist data store to store the job status, job detail, etc....

  • The detail of job, which key is the unique job id

  • The repeatable job configuration and its related job executed (job ids are generated in random string)

  • The completed job list, the history of execution of job and the repeatable configuration list

Bull Dashboard

  • To visualise the job datas with redis data

npm install --save @bull-board/nestjs @bull-board/api
@Module({
  imports: [
    LoggerModule.forRoot({
      pinoHttp: pinoOptionConfig(),
    }),
    // connect redis with bullmq
    BullModule.forRoot({
      connection: {
        host: 'localhost',
        port: 6379,
      },
    }),
    // init bull board
    BullBoardModule.forRoot({
      route: '/queues',
      adapter: ExpressAdapter,
    }),
    InsightModule,
  ],
  providers: [
    {
      provide: APP_FILTER,
      useClass: HttpExceptionFilter,
    },
  ],
})
export class AppModule implements NestModule {
  public configure(consumer: MiddlewareConsumer): void {
    consumer.apply(InjectTraceIdMiddleware).forRoutes('*');
  }
}
import { Module } from '@nestjs/common';
import { PrismaModule } from '../prisma/prisma.module';
import { InsightController } from './insight.controller';
import { InsightService } from './insight.service';
import { SearchEngineModule } from '../searchEngine/searchEngine.module';
import { WebScraperModule } from '../webScraper/webScraper.module';
import { BullModule } from '@nestjs/bullmq';
import { InsightGenerationProcessor } from './insightGeneration.process';
import { insightOverallProcessor } from './insightOverall.process';
import { insightArticleProcessor } from './insightArticle.process';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { BullBoardModule } from '@bull-board/nestjs';

@Module({
  imports: [
    BullModule.registerQueue({
      name: 'insightGeneration',
    }),
    BullModule.registerQueue({
      name: 'insightArticle',
    }),
    BullModule.registerQueue({
      name: 'insightOverall',
    }),
    BullModule.registerFlowProducer({
      name: 'insightFlow',
    }),
    BullBoardModule.forFeature({
      name: 'insightGeneration',
      adapter: BullMQAdapter,
    }),
    BullBoardModule.forFeature({
      name: 'insightArticle',
      adapter: BullMQAdapter,
    }),
    BullBoardModule.forFeature({
      name: 'insightOverall',
      adapter: BullMQAdapter,
    }),
    SearchEngineModule,
    WebScraperModule,
    PrismaModule,
  ],
  controllers: [InsightController],
  providers: [
    InsightService,
    InsightGenerationProcessor,
    insightOverallProcessor,
    insightArticleProcessor,
  ],
})
export class InsightModule {}

Last updated

Was this helpful?