Writing tools with streams in NODE.JS is a matter of caution

  • 2021-01-18 06:18:07
  • OfStack

Streams in Node.js are extremely powerful, providing support for processing potentially large files and abstracting data processing and passing in some scenarios. Because it is so good to use, so in the actual war we often based on it to write some tool functions/libraries, but often because of their own flow of some characteristics of the negligence, resulting in the written function/library in some cases will not achieve the desired effect, or buried some hidden mines. This article will provide two tips tools that you may find useful when writing stream-based tools.

1. Be aware of EVENTEMITTER memory leaks

In a function that may be called multiple times, if you need to add an event listener to the stream to perform some action. You need to be aware of memory leaks caused by adding listeners:


'use strict';
const fs = require('fs');
const co = require('co');

function getSomeDataFromStream (stream) {
 let data = stream.read();
 if (data) return Promise.resolve(data);

 if (!stream.readable) return Promise.resolve(null);

 return new Promise((resolve, reject) => {
  stream.once('readable', () => resolve(stream.read()));
  stream.on('error', reject);
  stream.on('end', resolve);
 })
}

let stream = fs.createReadStream('/Path/to/a/big/file');

co(function *() {
 let chunk;
 while ((chunk = yield getSomeDataFromStream(stream)) !== null) {
  console.log(chunk);
 }
}).catch(console.error);

In the above code, the getSomeDataFromStream function will complete this Promise by listening for error events and end events to report an error or no data. However, when executing the code, we will soon see an alarm message in the console: (node) warning: possible EventEmitter memory leak detected.11 error listeners added.Use emitter.setMaxListeners() to increase limit. We add an additional error event listener and end event listener for the incoming stream each time we call this function. To avoid this potential memory leak, we need to ensure that after each function execution, we clear all additional listeners added by this call and keep the function clean:


function getSomeDataFromStream (stream) {
 let data = stream.read();
 if (data) return Promise.resolve(data);

 if (!stream.readable) return Promise.resolve(null);

 return new Promise((resolve, reject) => {
  stream.once('readable', onData);
  stream.on('error', onError);
  stream.on('end', done);

  function onData () {
   done();
   resolve(stream.read());
  }

  function onError (err) {
   done();
   reject(err);
  }

  function done () {
   stream.removeListener('readable', onData);
   stream.removeListener('error', onError);
   stream.removeListener('end', done);
  }
 })
}

2. Make sure that the callback of the utility function is not called until the data is processed

Tool function tends to provide a callback function parameters, to be processed after all the data in the stream, with the specified values trigger, typically will hang up the callback function calls in the flow of end event, but if the handler is time-consuming asynchronous operations, the callback function may be called before all data have been processed:


'use strict';
const fs = require('fs');

let stream = fs.createReadStream('/Path/to/a/big/file');

function processSomeData (stream, callback) {
 stream.on('data', (data) => {
  //  Data processing 1 Some asynchronous time-consuming operations 
  setTimeout(() => console.log(data), 2000);
 });

 stream.on('end', () => {
  // ...
  callback()
 })
}

processSomeData(stream, () => console.log('end'));

The above code callback callback may be called even if the data has not been fully processed, because the end event for a stream is triggered only when the data in the stream is read. So we need to make an extra check to see if the data has been processed:


function processSomeData (stream, callback) {
 let count = 0;
 let finished = 0;
 let isEnd = false;

 stream.on('data', (data) => {
  count++;
  //  Data processing 1 Some asynchronous time-consuming operations 
  setTimeout(() => {
   console.log(data);
   finished++;
   check();
  }, 2000);
 });

 stream.on('end', () => {
  isEnd = true;
  // ...
  check();
 })

 function check () {
  if (count === finished && isEnd) callback()
 }
}

In this case, the callback will be fired after all data has been processed.


Related articles: