🖍️
Developer Note
  • Welcome
  • Git
    • Eslint & Prettier & Stylelint & Husky
  • Programming Language
    • JavaScript
      • Script Async vs Defer
      • Module
      • Const VS Let VS Var
      • Promise
      • Event Loop
      • Execution Context
      • Hoisting
      • Closure
      • Event Buddling and Capturing
      • Garbage Collection
      • This
      • Routing
      • Debounce and Throttle
      • Web Component
      • Iterator
      • Syntax
      • String
      • Array
      • Object
      • Proxy & Reflect
      • ProtoType
      • Class
      • Immutability
      • Typeof & Instanceof
      • Npm (Node package manager)
    • TypeScript
      • Utility Type
      • Type vs Interface
      • Any vs Unknown vs Never
      • Void and undefined
      • Strict Mode
      • Namespace
      • Enum
      • Module
      • Generic
    • Python
      • Local Development
      • Uv
      • Asyncio & Event loop
      • Context Manager
      • Iterator & Generator
      • Fast API
      • Pydantic & Data Class
    • Java
      • Compilation and Execution
      • Data Type
      • Enumeration
      • Data Structure
      • Try Catch
      • InputStream and OutputStream
      • Concurrent
      • Unicode Block
      • Build Tools
      • Servlet
      • Java 8
  • Coding Pattern
    • MVC vs MVVM
    • OOP vs Functional
    • Error Handling
    • MVC vs Flux
    • Imperative vs Declarative
    • Design Pattern
  • Web Communication
    • REST API
      • Web Hook
      • CORS issue
    • HTTPS
    • GraphQL
      • REST API vs GraphQL
      • Implementation (NodeJS + React)
    • Server-Sent Event
    • Web Socket
    • IP
    • Domain Name System (DNS)
  • Frontend
    • Progressive Web App (PWA)
    • Single Page & Multiple Page Application
    • Search Engine Optimiaztion (SEO)
    • Web bundling & Micro-frontend
      • Webpack
        • Using Webpack to build React Application
        • Using Webpack to build react library
      • Vite
      • Using rollup to build react library
      • Implementing micro frontend
    • Web Security
      • CSRF & Nonce
      • XSS
      • Click hijacking
    • Cypress
    • CSS
      • Core
        • Box Model
        • Inline vs Block
        • Flexbox & Grid
        • Pseudo Class
        • Position
      • Tailwind CSS
        • Shadcn
      • CSS In JS
        • Material UI
    • React
      • Core
        • Component Pattern
        • React Lazy & Suspense
        • React Portal
        • Error Boundary
        • Rendering Methods
        • Environment Variable
        • Conditional CSS
        • Memo
        • Forward Reference
        • High Order Component (HOC) & Custom Hook
        • TypeScript
      • State Management
        • Redux
        • Recoil
        • Zustand
      • Routing
        • React Router Dom
      • Data Fetching
        • Axios & Hook
        • React Query
        • Orval
      • Table
        • React Table
      • Form & Validation
        • React Hook Form
        • Zod
      • NextJS
        • Page Router
        • App Router
      • React Native
    • Angular
    • Svelte
      • Svelte Kit
  • Backend
    • Cache
      • Browser Cache
      • Web Browser Storage
      • Proxy
      • Redis
    • Rate limit
    • Load Test
    • Encryption
    • Authentication
      • Password Protection
      • Cookie & Session
      • JSON Web Token
      • SSO
        • OAuth 2.0
        • OpenID Connect (OIDC)
        • SAML
    • Payment
      • Pre-built
      • Custom
    • File Handling
      • Upload & Download (Front-end)
      • Stream & Buffer
    • Microservice
      • API Gateway
      • Service Discovery
      • Load Balancer
      • Circuit Breaker
      • Message Broker
      • BulkHead & Zipkin
    • Elastic Search
    • Database
      • SQL
        • Group By vs Distinct
        • Index
        • N + 1 problem
        • Normalization
        • Foreign Key
        • Relationship
        • Union & Join
        • User Defined Type
      • NOSQL (MongoDB)
      • Transaction
      • Sharding
      • Lock (Concurrency Control)
    • NodeJS
      • NodeJS vs Java Spring
      • ExpressJS
      • NestJS
        • Swagger
        • Class Validator & Validation Pipe
        • Passport (Authentication)
      • Path Module
      • Database Connection
        • Integrating with MYSQL
        • Sequalize
        • Integrating with MongoDB
        • Prisma
        • MikroORM
        • Mongoose
      • Streaming
      • Worker Thread
      • Passport JS
      • JSON Web Token
      • Socket IO
      • Bull MQ
      • Pino (Logging)
      • Yeoman
    • Spring
      • Spring MVC
      • Spring REST
      • Spring Actuator
      • Aspect Oriented Programming (AOP)
      • Controller Advice
      • Filter
      • Interceptor
      • Concurrent
      • Spring Security
      • Spring Boot
      • Spring Cloud
        • Resilience 4j
      • Quartz vs Spring Batch
      • JPA and Hibernate
      • HATEOS
      • Swagger
      • Unit Test (Java Spring)
      • Unit Test (Spring boot)
  • DevOp
    • Docker
    • Kubernetes
      • Helm
    • Nginx
    • Observability
      • Logging
        • Loki
        • PLG Setup
      • Distributed Tracing
        • OpenTelemetry
        • Tempo
    • File System
    • Cloud
      • AWS
        • EC2 (Virtual Machine)
        • Network
        • IAM
          • Role-Service Binding
        • Database
        • Route 53
        • S3
        • Message Queue
        • Application Service
        • Serverless Framework
        • Data Analysis
        • Machine Learning
        • Monitoring
        • Security
      • Azure
        • Identity
        • Compute Resource
        • Networking
        • Storage
        • Monitoring
      • Google Cloud
        • IAM
          • Workload Identity Federation
        • Compute Engine
        • VPC Network
        • Storage
        • Kubernetes Engine
        • App Engine
        • Cloud function
        • Cloud Run
        • Infra as Code
        • Pub/Sub
    • Deployment Strategy
    • Jenkins
    • Examples
      • Deploy NextJS on GCP
      • Deploy Spring on Azure
      • Deploy React on Azure
  • Domain Knowledge
    • Web 3
      • Blockchain
      • Cryptocurrency
    • AI
      • Prompt
      • Chain & Agent
      • MCP
      • LangChain
      • Chunking
      • Search
      • Side Products
      • Crawl4AI
Powered by GitBook
On this page
  • Introduction
  • Producer
  • Consumer
  • Lifecycle of job
  • Redis Data
  • Bull Dashboard

Was this helpful?

  1. Backend
  2. NodeJS

Bull MQ

PreviousSocket IONextPino (Logging)

Last updated 9 months ago

Was this helpful?

Introduction

  • BullMQ is a library that implements a fast and robust queue system built on top of that helps in resolving many modern age micro-services architectures.

  • Redis is used to store the history of execution history of job, the detail and configuration of job, job will be distinguished by using job id.

  • Break up monolithic tasks that may otherwise block the Node.js event loop. For example, if a user request requires CPU intensive work like audio transcoding, you can delegate this task to other processes, freeing up user-facing processes to remain responsive.

  • It is using publisher-subscriber mechanism, The producer is to create a job with unique id and data and transfer through the queue. The consumer is to subscribe to a queue in order to receive the job and execute the logic.

Producer

  • Job producers add jobs to queues.

import { Injectable } from '@nestjs/common';
import { Queue } from 'bullmq';
import { InjectQueue } from '@nestjs/bullmq';

@Injectable()
export class TestService {
  constructor(@InjectQueue('test') private testQueue: Queue) {}
  async test(){
    // simple job
    await this.testQueue.add('immediateJob', { test: 1 }, 
     { attempts: 2, jobId: 'jobId1' }
    );
    // repeatable job
    await this.testQueue.add(
       'schedule job',
       { test: 1 },
       { repeat: { every: 2000, limit: 2 }, jobId: 'repeatJob' },
    );
    // flow
    await flowProducer.add({
      name: 'flow',
      data: { step: 'flow' },
      queueName: 'test',
      children: [
        {
          name: 'car',
          data: { step: 'wheels' },
          queueName: 'test',
          children: [{ name: 'car', data: { step: 'chassis' }, queueName: 'test' }],
        },
      ],
    });    
    return '123';
  }
}

Consumer

  • A consumer is a class defining methods that either process jobs added into the queue,

  • It is also for listen for events on the queue

import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';
// define the rate limit of processing job
@Processor('test', {
  concurrency: 4,
  limiter: {
    max: 50,
    duration: 2000,
  },
})
export class InsightProcessor extends WorkerHost {
  private readonly logger = new Logger(InsightProcessor.name);
  // The core logic of processing the job from queue
  async process(job: Job<any, any, string>): Promise<any> {
    console.log('job', job.id, job.data);
    return 'done';
  }
  // to listen the state of the job
  @OnWorkerEvent('active')
  onActive(job: Job) {
    console.log(
      `Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
    );
  }
}

Lifecycle of job

When a job is added to a queue it can be in one of three states:

  • “wait”: a waiting list, where all jobs must enter before they can be processed.

  • “prioritized”: implies that a job with higher priority will be processed first.

  • “delayed”: implies that the job is waiting for some timeout or to be promoted for being processed. These jobs are not processed directly, but instead are placed at the beginning of the waiting list, or in a prioritized set, and processed as soon as a worker is idle.

The next state for a job is the “active” state. The active state is represented by a set, and are jobs that are currently being processed

the job will end in either the “completed” or the “failed” status.

Redis Data

  • Redis acts as a persist data store to store the job status, job detail, etc....

  • The detail of job, which key is the unique job id

  • The repeatable job configuration and its related job executed (job ids are generated in random string)

  • The completed job list, the history of execution of job and the repeatable configuration list

Bull Dashboard

  • To visualise the job datas with redis data

npm install --save @bull-board/nestjs @bull-board/api
@Module({
  imports: [
    LoggerModule.forRoot({
      pinoHttp: pinoOptionConfig(),
    }),
    // connect redis with bullmq
    BullModule.forRoot({
      connection: {
        host: 'localhost',
        port: 6379,
      },
    }),
    // init bull board
    BullBoardModule.forRoot({
      route: '/queues',
      adapter: ExpressAdapter,
    }),
    InsightModule,
  ],
  providers: [
    {
      provide: APP_FILTER,
      useClass: HttpExceptionFilter,
    },
  ],
})
export class AppModule implements NestModule {
  public configure(consumer: MiddlewareConsumer): void {
    consumer.apply(InjectTraceIdMiddleware).forRoutes('*');
  }
}
import { Module } from '@nestjs/common';
import { PrismaModule } from '../prisma/prisma.module';
import { InsightController } from './insight.controller';
import { InsightService } from './insight.service';
import { SearchEngineModule } from '../searchEngine/searchEngine.module';
import { WebScraperModule } from '../webScraper/webScraper.module';
import { BullModule } from '@nestjs/bullmq';
import { InsightGenerationProcessor } from './insightGeneration.process';
import { insightOverallProcessor } from './insightOverall.process';
import { insightArticleProcessor } from './insightArticle.process';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { BullBoardModule } from '@bull-board/nestjs';

@Module({
  imports: [
    BullModule.registerQueue({
      name: 'insightGeneration',
    }),
    BullModule.registerQueue({
      name: 'insightArticle',
    }),
    BullModule.registerQueue({
      name: 'insightOverall',
    }),
    BullModule.registerFlowProducer({
      name: 'insightFlow',
    }),
    BullBoardModule.forFeature({
      name: 'insightGeneration',
      adapter: BullMQAdapter,
    }),
    BullBoardModule.forFeature({
      name: 'insightArticle',
      adapter: BullMQAdapter,
    }),
    BullBoardModule.forFeature({
      name: 'insightOverall',
      adapter: BullMQAdapter,
    }),
    SearchEngineModule,
    WebScraperModule,
    PrismaModule,
  ],
  controllers: [InsightController],
  providers: [
    InsightService,
    InsightGenerationProcessor,
    insightOverallProcessor,
    insightArticleProcessor,
  ],
})
export class InsightModule {}
Node.js
Redis