Bạn có thể giải quyết vấn đề này bằng cách sử dụng thư viện không đồng bộ. Bạn có thể sử dụng mẫu bên dưới cho bất kỳ luồng nào.
var AsyncLib = require('async');
var worker = function (payload, cb) {
//do something with payload and call callback
return cb();
};
var concurrency = 5;
var streamQueue = AsyncLib.queue(worker, concurrency);
var stream = //some readable stream;
stream.on('data', function(data) {
//no need to pause and resume
var payload = '//some payload';
streamQueue.push(payload);
})
.on('end', function() {
//register drain event on end and callback
streamQueue.drain = function () {
callback();
};
});