Streaming
Readable Stream
It is to represent the data from source and able to consume it by chunks
For example: File (
fs.createReadableStream
) , S3 Object(Get Request), HTTP Request
var fs = require("fs");
var { Readable } = require("stream");
var data = '';
// Create a readable stream to read the file
var readerStream = fs.createReadStream('file.txt');
readerStream.setEncoding('UTF8'); // Set the encoding to be utf8.
// Create a readable stream from string
var readerStream = Readable.from("");
// push the chunks into stream
for(let i = 0 ; i < 1 ; i++) {
stream.push("123")
}
console.log("stream end");
// end the stream
stream.push(null);
// listen the data from the readable stream
readerStream.on('data', function(chunk) {
data += chunk;
});
// listen the stream is finished
readerStream.on('end',function() {
console.log(data);
});
// listen to the error
readerStream.on('error', function(err) {
console.log(err.stack);
});
// consume the content of chunk from readable stream
for await(const chunk of readerStream){
console.log(chunk);
}
console.log("Program Ended");
Write Stream
It is to represent the destination and able to write into it by chunks
E.g: File (
fs.createWritableStream
) , S3 Object (Put Request), HTTP ResponseData will be copied into a buffer of a memory and then flush it into destination
If you are writing data very fast, the disk won't probably be able to keep up and the actual buffered data will grow. If this keeps happening you will easily fill all the memory, which is called B
ackpressure
In fact, every single time we call
.write(data)
on a stream, the stream will return aboolean
value. If this value istrue
it means that it is safe to continue, if the value isfalse
it means that the destination is lagging behind and the stream is accumulating too much data to write. In this last case, when you receivefalse
, you should slow down, or even better stop writing until all the buffered data is written down.A writable stream will emit a
drain
event once all the buffered data has been flushed and it's safe to write again.
var fs = require("fs");
var outputdata = "Hello";
const srcStream = createReadStream("input.txt");
// Create writeable stream to a new file
var writerStream = fs.createWriteStream("output.txt");
// write the data to file
writerSteam.on("data", function() {
const canContinue = writerStream.write(data)
if (!canContinue) {
// we are overflowing the destination, we should pause
srcStream.pause()
// we will resume when the destination stream is drained
writerStream.once('drain', () => Stream.resume())
}
})
// write the data to the end of the file
writerStream.end();
// listen when the stream is finished
writerStream.on("finish", function() {
console.log("Finished");
});
// listen when there is error occured
writerStream.on("error", function(err) {
console.log(err.stack);
});
Pipe
Connects the Readable stream to the Writable stream: all the data read from
readable
is automagically™️ copied over thewritable
stream, chunk by chunk.Handles the
end
event properly, once all the data is copied over the writable, both readable and writable are properly closed.It returns a new instance of a stream so that you can keep using
.pipe
to connect many streams together. This is mostly useful with transform streams and it allows us to create complex pipelines for data processing.
var fs = require("fs");
// all the data from input.txt will go to output.txt
fs.createReadStream('input.txt')
.pipe(fs.createWriteStream('output.txt'));
// Express response is also one of the writable stream,
// so it can be piped and output the file
fs.createReadStream('input.csv').pipe(res);
res.setHeader('Content-disposition', `attachment; filename=output.csv`);
res.setHeader('Content-Type', 'text/csv; charset=utf8');
We can pass an arbitrary number of streams followed by a callback function. The streams get piped in order and the callback is called when all the processing is done or in case there's an error. If there's an error all the streams are properly ended and destroyed for you. So with this utility we could rewrite our previous example as follows:
import { pipeline } from 'stream'
pipeline(
readableFileStream,
decompressStream,
decryptStream,
convertStream,
encryptStream,
compressStream,
writableFileStream,
(err) => {
if (err) {
console.error(err)
} else {
console.log('Processing completed successfully')
}
}
)
Duplex Stream

A Duplex stream is essentially a stream that is both Readable and Writable.
Its main purpose is to act as a middleware to read the data from readable stream chunk by chunk and then handle it, and write to writable stream chunk by chunk
Transform stream is a one of the example of duplex stream
// source from empty string
const readableStream = Readable.from('read');
// destination
const writable = createWriteStream('output.txt');
// create new duplex stream
const logStream = new PassThrough();
// write the extra into duplex stream
logStream.write('456');
// read the data from readable stream
readableStream.pipe(logStream);
res.set({
'Content-Type': 'text/csv',
'Content-Disposition': 'attachment; filename="test.csv"'
});
// write the result into writable stream
writable.write('write');
logStream.pipe(writable);
// result of output.txt:
// write456read
class UppercaseTransform extends Transform {
_transform(chunk, encoding, callback) {
const transformedChunk = chunk.toString().toUpperCase();
callback(null, transformedChunk);
}
}
const readableStream = Readable.from('read');
const writable = createWriteStream('output.txt');
const uppercaseTransform = new UppercaseTransform();
readableStream.pipe(uppercaseTransform).pipe(writable);
// result of output.txt:
// READ
Reference
Last updated
Was this helpful?