Worker Thread

Background

  • Doing CPU intensive work like calculating the Fibonacci of a number or checking if a number is prime or not or heavy machine learning stuff is gonna make the application struggle because node only uses a single core of your CPU no matter how many cores you have.

  • As the request is handled with single thread on nodeJs, if the event loop is blocked by high workload, the upcoming request will also be blocked

const express = require("express")
const app = express()

app.get("/isprime", async (req, res) => {
  const startTime = new Date()
  const result = doingCPUHighWorkload();
  const endTime = new Date()
  res.json({
    number: parseInt(req.query.number),
    isprime: result,
    time: endTime.getTime() - startTime.getTime() + "ms",
  })
})

app.get("/testrequest", (req, res) => {
  res.send("I am unblocked now")
})

const doingCPUHighWorkload = (number) => {
  for(const i of largeArrSize){
    calculation(i)
  }
}

app.listen(3000, () => console.log("listening on port 3000"))
  • In prevent from the blocking of event loop, we should separate the workload into another thread, called worker thread

Introduction

  • When NodeJS process is launch, it gets launched with One process, One thread, One event loop, One V8 Engine Instance,

  • A V8 isolate refers to a separate entity within the chrome V8 runtime. It possesses its own JS heap and a microtask queue. This unique setup enables every Node.js worker to execute its JavaScript code in complete isolation from other workers.this isolation comes at a cost — the workers are unable to directly access each other’s heaps.

  • An event-based messaging system is provided so values can be exchanged between the processes.

Worker Event

Worker offers few events which we can tie up with the callbacks we want

  • message : The 'message' event is emitted when the worker thread has invoked parentPort.postMessage().

  • exit : The 'exit' event is emitted once the worker has stopped. If the worker exited by calling process.exit(), the exitCode parameter is the passed exit code. If the worker was terminated, the exitCode parameter is 1. This is the final event emitted by any Worker instance.

  • error : The 'error' event is emitted if the worker thread throws an uncaught exception. In that case, the worker is terminated.

  • online: The 'online' event is emitted when the worker thread has started executing JavaScript code.

const {
  Worker, isMainThread, parentPort, workerData,
} = require('node:worker_threads');

if (isMainThread) {
  module.exports = function parseJSAsync(script) {
    return new Promise((resolve, reject) => {
      const worker = new Worker(__filename, {
        workerData: script,
      });
      worker.on('message', resolve);
      worker.on('error', reject);
      worker.on('exit', (code) => {
        if (code !== 0)
          reject(new Error(`Worker stopped with exit code ${code}`));
      });
    });
  };
} else {
  const { parse } = require('some-js-parsing-library');
  const script = workerData;
  parentPort.postMessage(parse(script));
}

Example

import { Injectable, Logger } from '@nestjs/common';
import { Worker, isMainThread } from 'worker_threads';
import workerThreadFilePath from './worker-threads/config';

@Injectable()
export class AppService {
  private readonly logger = new Logger(AppService.name);

  checkMainThread() {
    this.logger.debug(
      'Are we on the main thread here?',
      isMainThread ? 'Yes.' : 'No.',
    );
  }

  // do not run this from the worker thread or you will spawn an infinite number of threads in cascade
  runWorker(fibonacci: number): string {
    this.checkMainThread();

    const thisService = this;
    const worker = new Worker(__dirname + '/findFibonacciSum.js' , {
      workerData: fibonacci,
    });
    worker.on('message', (fibonacciSum) => {
      thisService.logger.verbose('Calculated sum', fibonacciSum);
    });
    worker.on('error', (e) => console.log('on error', e));
    worker.on('exit', (code) => console.log('on exit', code));

    return 'Processing the fibonacci sum... Check NestJS app console for the result.';
  }
}
// findFibonacciSum.js
import { NestFactory } from '@nestjs/core';
import { workerData, parentPort } from 'worker_threads';
import { AppModule } from '../app.module';
import { FibonacciService } from '../fibonacci/fibonacci.service';
import { AppService } from '../app.service';

async function run() {
  const app = await NestFactory.createApplicationContext(AppModule);
  const appService = app.get(AppService);
  const fibonacciService = app.get(FibonacciService);

  const fibonacciNumber: number = workerData; // this is data received from main thread


  // here we apply business logic inside the worker thread
  appService.checkMainThread();
  const fibonacciSum = fibonacciService.fibonacci(fibonacciNumber);
  parentPort.postMessage(fibonacciSum);
  // throw new Error("test");
  // process.exit();
}

run();

Reference

Last updated

Was this helpful?