🖍️
Developer Note
  • Welcome
  • Git
    • Eslint & Prettier & Stylelint & Husky
  • Programming Language
    • JavaScript
      • Script Async vs Defer
      • Module
      • Const VS Let VS Var
      • Promise
      • Event Loop
      • Execution Context
      • Hoisting
      • Closure
      • Event Buddling and Capturing
      • Garbage Collection
      • This
      • Routing
      • Debounce and Throttle
      • Web Component
      • Iterator
      • Syntax
      • String
      • Array
      • Object
      • Proxy & Reflect
      • ProtoType
      • Class
      • Immutability
      • Typeof & Instanceof
      • Npm (Node package manager)
    • TypeScript
      • Utility Type
      • Type vs Interface
      • Any vs Unknown vs Never
      • Void and undefined
      • Strict Mode
      • Namespace
      • Enum
      • Module
      • Generic
    • Python
      • Local Development
      • Uv
      • Asyncio & Event loop
      • Context Manager
      • Iterator & Generator
      • Fast API
      • Pydantic & Data Class
    • Java
      • Compilation and Execution
      • Data Type
      • Enumeration
      • Data Structure
      • Try Catch
      • InputStream and OutputStream
      • Concurrent
      • Unicode Block
      • Build Tools
      • Servlet
      • Java 8
  • Coding Pattern
    • MVC vs MVVM
    • OOP vs Functional
    • Error Handling
    • MVC vs Flux
    • Imperative vs Declarative
    • Design Pattern
  • Web Communication
    • REST API
      • Web Hook
      • CORS issue
    • HTTPS
    • GraphQL
      • REST API vs GraphQL
      • Implementation (NodeJS + React)
    • Server-Sent Event
    • Web Socket
    • IP
    • Domain Name System (DNS)
  • Frontend
    • Progressive Web App (PWA)
    • Single Page & Multiple Page Application
    • Search Engine Optimiaztion (SEO)
    • Web bundling & Micro-frontend
      • Webpack
        • Using Webpack to build React Application
        • Using Webpack to build react library
      • Vite
      • Using rollup to build react library
      • Implementing micro frontend
    • Web Security
      • CSRF & Nonce
      • XSS
      • Click hijacking
    • Cypress
    • CSS
      • Core
        • Box Model
        • Inline vs Block
        • Flexbox & Grid
        • Pseudo Class
        • Position
      • Tailwind CSS
        • Shadcn
      • CSS In JS
        • Material UI
    • React
      • Core
        • Component Pattern
        • React Lazy & Suspense
        • React Portal
        • Error Boundary
        • Rendering Methods
        • Environment Variable
        • Conditional CSS
        • Memo
        • Forward Reference
        • High Order Component (HOC) & Custom Hook
        • TypeScript
      • State Management
        • Redux
        • Recoil
        • Zustand
      • Routing
        • React Router Dom
      • Data Fetching
        • Axios & Hook
        • React Query
        • Orval
      • Table
        • React Table
      • Form & Validation
        • React Hook Form
        • Zod
      • NextJS
        • Page Router
        • App Router
      • React Native
    • Angular
    • Svelte
      • Svelte Kit
  • Backend
    • Cache
      • Browser Cache
      • Web Browser Storage
      • Proxy
      • Redis
    • Rate limit
    • Monitoring
      • Logging
      • Distributed Tracing
    • Load Test
    • Encryption
    • Authentication
      • Password Protection
      • Cookie & Session
      • JSON Web Token
      • SSO
        • OAuth 2.0
        • OpenID Connect (OIDC)
        • SAML
    • Payment
      • Pre-built
      • Custom
    • File Handling
      • Upload & Download (Front-end)
      • Stream & Buffer
    • Microservice
      • API Gateway
      • Service Discovery
      • Load Balancer
      • Circuit Breaker
      • Message Broker
      • BulkHead & Zipkin
    • Elastic Search
    • Database
      • SQL
        • Group By vs Distinct
        • Index
        • N + 1 problem
        • Normalization
        • Foreign Key
        • Relationship
        • Union & Join
        • User Defined Type
      • NOSQL (MongoDB)
      • Transaction
      • Sharding
      • Lock (Concurrency Control)
    • NodeJS
      • NodeJS vs Java Spring
      • ExpressJS
      • NestJS
        • Swagger
        • Class Validator & Validation Pipe
        • Passport (Authentication)
      • Path Module
      • Database Connection
        • Integrating with MYSQL
        • Sequalize
        • Integrating with MongoDB
        • Prisma
        • MikroORM
        • Mongoose
      • Streaming
      • Worker Thread
      • Passport JS
      • JSON Web Token
      • Socket IO
      • Bull MQ
      • Pino (Logging)
      • Yeoman
    • Spring
      • Spring MVC
      • Spring REST
      • Spring Actuator
      • Aspect Oriented Programming (AOP)
      • Controller Advice
      • Filter
      • Interceptor
      • Concurrent
      • Spring Security
      • Spring Boot
      • Spring Cloud
        • Resilience 4j
      • Quartz vs Spring Batch
      • JPA and Hibernate
      • HATEOS
      • Swagger
      • Unit Test (Java Spring)
      • Unit Test (Spring boot)
  • DevOp
    • Docker
    • Kubernetes
      • Helm
    • Nginx
    • File System
    • Cloud
      • AWS
        • EC2 (Virtual Machine)
        • Network
        • IAM
          • Role-Service Binding
        • Database
        • Route 53
        • S3
        • Message Queue
        • Application Service
        • Serverless Framework
        • Data Analysis
        • Machine Learning
        • Monitoring
        • Security
      • Azure
        • Identity
        • Compute Resource
        • Networking
        • Storage
        • Monitoring
      • Google Cloud
        • IAM
          • Workload Identity Federation
        • Compute Engine
        • VPC Network
        • Storage
        • Kubernetes Engine
        • App Engine
        • Cloud function
        • Cloud Run
        • Infra as Code
        • Pub/Sub
    • Deployment Strategy
    • Jenkins
    • Examples
      • Deploy NextJS on GCP
      • Deploy Spring on Azure
      • Deploy React on Azure
  • Domain Knowledge
    • Web 3
      • Blockchain
      • Cryptocurrency
    • AI
      • Prompt
      • Chain & Agent
      • LangChain
      • Chunking
      • Search
      • Side Products
Powered by GitBook
On this page
  • Readable Stream
  • Write Stream
  • Pipe
  • Duplex Stream
  • Reference

Was this helpful?

  1. Backend
  2. NodeJS

Streaming

Readable Stream

  • It is to represent the data from source and able to consume it by chunks

  • For example: File (fs.createReadableStream) , S3 Object(Get Request), HTTP Request

var fs = require("fs");
var { Readable } = require("stream");
var data = '';

// Create a readable stream to read the file
var readerStream = fs.createReadStream('file.txt'); 

readerStream.setEncoding('UTF8'); // Set the encoding to be utf8. 

// Create a readable stream from string
var readerStream = Readable.from("");
// push the chunks into stream
for(let i = 0 ; i < 1 ; i++) {
  stream.push("123")
}
console.log("stream end");
// end the stream
stream.push(null);


// listen the data from the readable stream
readerStream.on('data', function(chunk) {
   data += chunk;
});

// listen the stream is finished
readerStream.on('end',function() {
   console.log(data);
});

// listen to the error
readerStream.on('error', function(err) {
   console.log(err.stack);
});

// consume the content of chunk from readable stream
for await(const chunk of readerStream){
   console.log(chunk);
}

console.log("Program Ended");

Write Stream

  • It is to represent the destination and able to write into it by chunks

  • E.g: File (fs.createWritableStream) , S3 Object (Put Request), HTTP Response

  • Data will be copied into a buffer of a memory and then flush it into destination

  • If you are writing data very fast, the disk won't probably be able to keep up and the actual buffered data will grow. If this keeps happening you will easily fill all the memory, which is called Backpressure

  • In fact, every single time we call .write(data) on a stream, the stream will return a boolean value. If this value is true it means that it is safe to continue, if the value is false it means that the destination is lagging behind and the stream is accumulating too much data to write. In this last case, when you receive false, you should slow down, or even better stop writing until all the buffered data is written down.

  • A writable stream will emit a drain event once all the buffered data has been flushed and it's safe to write again.

var fs = require("fs");

var outputdata = "Hello";

const srcStream = createReadStream("input.txt");

// Create writeable stream to a new file
var writerStream = fs.createWriteStream("output.txt");

// write the data to file
writerSteam.on("data", function() {
  const canContinue = writerStream.write(data)
  if (!canContinue) {
    // we are overflowing the destination, we should pause
    srcStream.pause()
    // we will resume when the destination stream is drained
    writerStream.once('drain', () => Stream.resume())
  }
})

// write the data to the end of the file 
writerStream.end();

// listen when the stream is finished
writerStream.on("finish", function() {
  console.log("Finished");
});

// listen when there is error occured
writerStream.on("error", function(err) {
  console.log(err.stack);
});

Pipe

  • Connects the Readable stream to the Writable stream: all the data read from readable is automagically™️ copied over the writable stream, chunk by chunk.

  • Handles the end event properly, once all the data is copied over the writable, both readable and writable are properly closed.

  • It returns a new instance of a stream so that you can keep using .pipe to connect many streams together. This is mostly useful with transform streams and it allows us to create complex pipelines for data processing.

var fs = require("fs");

// all the data from input.txt will go to output.txt
fs.createReadStream('input.txt')
  .pipe(fs.createWriteStream('output.txt'));
  
// Express response is also one of the writable stream, 
// so it can be piped and output the file 
fs.createReadStream('input.csv').pipe(res);

res.setHeader('Content-disposition', `attachment; filename=output.csv`);
res.setHeader('Content-Type', 'text/csv; charset=utf8');

  • We can pass an arbitrary number of streams followed by a callback function. The streams get piped in order and the callback is called when all the processing is done or in case there's an error. If there's an error all the streams are properly ended and destroyed for you. So with this utility we could rewrite our previous example as follows:

import { pipeline } from 'stream'

pipeline(
  readableFileStream,
  decompressStream,
  decryptStream,
  convertStream,
  encryptStream,
  compressStream,
  writableFileStream,
  (err) => {
    if (err) {
      console.error(err)
    } else {
      console.log('Processing completed successfully')
    }
  }
)

Duplex Stream

  • A Duplex stream is essentially a stream that is both Readable and Writable.

  • Its main purpose is to act as a middleware to read the data from readable stream chunk by chunk and then handle it, and write to writable stream chunk by chunk

  • Transform stream is a one of the example of duplex stream

// source from empty string
const readableStream = Readable.from('read');
// destination
const writable = createWriteStream('output.txt');
// create new duplex stream
const logStream = new PassThrough();
// write the extra into duplex stream
logStream.write('456');
// read the data from readable stream
readableStream.pipe(logStream);
res.set({
    'Content-Type': 'text/csv',
    'Content-Disposition': 'attachment; filename="test.csv"'
});
// write the result into writable stream
writable.write('write');
logStream.pipe(writable);

// result of output.txt:
// write456read
class UppercaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    const transformedChunk = chunk.toString().toUpperCase();
    callback(null, transformedChunk);
  }
}
const readableStream = Readable.from('read');
const writable = createWriteStream('output.txt');
const uppercaseTransform = new UppercaseTransform();
readableStream.pipe(uppercaseTransform).pipe(writable);

// result of output.txt:
// READ

Reference

PreviousMongooseNextWorker Thread

Last updated 8 months ago

Was this helpful?

LogoStream | Node.js v19.3.0 Documentation
LogoUnderstanding Streams in Node.jsThe NodeSource Blog - Node.js Tutorials, Guides, and Updates
https://ithelp.ithome.com.tw/articles/10221119ithelp.ithome.com.tw
LogoGitHub - lmammino/streams-workshop: A FREE and open-source workshop about Node.js StreamsGitHub