Streaming

Readable Stream

  • It is to represent the data from source and able to consume it by chunks

  • For example: File (fs.createReadableStream) , S3 Object(Get Request), HTTP Request

var fs = require("fs");
var { Readable } = require("stream");
var data = '';

// Create a readable stream to read the file
var readerStream = fs.createReadStream('file.txt'); 

readerStream.setEncoding('UTF8'); // Set the encoding to be utf8. 

// Create a readable stream from string
var readerStream = Readable.from("");
// push the chunks into stream
for(let i = 0 ; i < 1 ; i++) {
  stream.push("123")
}
console.log("stream end");
// end the stream
stream.push(null);


// listen the data from the readable stream
readerStream.on('data', function(chunk) {
   data += chunk;
});

// listen the stream is finished
readerStream.on('end',function() {
   console.log(data);
});

// listen to the error
readerStream.on('error', function(err) {
   console.log(err.stack);
});

// consume the content of chunk from readable stream
for await(const chunk of readerStream){
   console.log(chunk);
}

console.log("Program Ended");

Write Stream

  • It is to represent the destination and able to write into it by chunks

  • E.g: File (fs.createWritableStream) , S3 Object (Put Request), HTTP Response

  • Data will be copied into a buffer of a memory and then flush it into destination

  • If you are writing data very fast, the disk won't probably be able to keep up and the actual buffered data will grow. If this keeps happening you will easily fill all the memory, which is called Backpressure

  • In fact, every single time we call .write(data) on a stream, the stream will return a boolean value. If this value is true it means that it is safe to continue, if the value is false it means that the destination is lagging behind and the stream is accumulating too much data to write. In this last case, when you receive false, you should slow down, or even better stop writing until all the buffered data is written down.

  • A writable stream will emit a drain event once all the buffered data has been flushed and it's safe to write again.

var fs = require("fs");

var outputdata = "Hello";

const srcStream = createReadStream("input.txt");

// Create writeable stream to a new file
var writerStream = fs.createWriteStream("output.txt");

// write the data to file
writerSteam.on("data", function() {
  const canContinue = writerStream.write(data)
  if (!canContinue) {
    // we are overflowing the destination, we should pause
    srcStream.pause()
    // we will resume when the destination stream is drained
    writerStream.once('drain', () => Stream.resume())
  }
})

// write the data to the end of the file 
writerStream.end();

// listen when the stream is finished
writerStream.on("finish", function() {
  console.log("Finished");
});

// listen when there is error occured
writerStream.on("error", function(err) {
  console.log(err.stack);
});

Pipe

  • Connects the Readable stream to the Writable stream: all the data read from readable is automagically™️ copied over the writable stream, chunk by chunk.

  • Handles the end event properly, once all the data is copied over the writable, both readable and writable are properly closed.

  • It returns a new instance of a stream so that you can keep using .pipe to connect many streams together. This is mostly useful with transform streams and it allows us to create complex pipelines for data processing.

var fs = require("fs");

// all the data from input.txt will go to output.txt
fs.createReadStream('input.txt')
  .pipe(fs.createWriteStream('output.txt'));
  
// Express response is also one of the writable stream, 
// so it can be piped and output the file 
fs.createReadStream('input.csv').pipe(res);

res.setHeader('Content-disposition', `attachment; filename=output.csv`);
res.setHeader('Content-Type', 'text/csv; charset=utf8');

  • We can pass an arbitrary number of streams followed by a callback function. The streams get piped in order and the callback is called when all the processing is done or in case there's an error. If there's an error all the streams are properly ended and destroyed for you. So with this utility we could rewrite our previous example as follows:

import { pipeline } from 'stream'

pipeline(
  readableFileStream,
  decompressStream,
  decryptStream,
  convertStream,
  encryptStream,
  compressStream,
  writableFileStream,
  (err) => {
    if (err) {
      console.error(err)
    } else {
      console.log('Processing completed successfully')
    }
  }
)

Duplex Stream

  • A Duplex stream is essentially a stream that is both Readable and Writable.

  • Its main purpose is to act as a middleware to read the data from readable stream chunk by chunk and then handle it, and write to writable stream chunk by chunk

  • Transform stream is a one of the example of duplex stream

// source from empty string
const readableStream = Readable.from('read');
// destination
const writable = createWriteStream('output.txt');
// create new duplex stream
const logStream = new PassThrough();
// write the extra into duplex stream
logStream.write('456');
// read the data from readable stream
readableStream.pipe(logStream);
res.set({
    'Content-Type': 'text/csv',
    'Content-Disposition': 'attachment; filename="test.csv"'
});
// write the result into writable stream
writable.write('write');
logStream.pipe(writable);

// result of output.txt:
// write456read
class UppercaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    const transformedChunk = chunk.toString().toUpperCase();
    callback(null, transformedChunk);
  }
}
const readableStream = Readable.from('read');
const writable = createWriteStream('output.txt');
const uppercaseTransform = new UppercaseTransform();
readableStream.pipe(uppercaseTransform).pipe(writable);

// result of output.txt:
// READ

Reference

Last updated

Was this helpful?