/**
* Copyright (c) 2015-2017 Guyon Roche
* LICENCE: MIT - please refer to LICENCE file included with this module
* or https://github.com/guyonroche/exceljs/blob/master/LICENSE
*/
'use strict';
var events = require('events');
var PromishLib = require('./promish');
var utils = require('./utils');
// =============================================================================
// FlowControl - Used to slow down streaming to manageable speed
// Implements a subset of Stream.Duplex: pipe() and write()
var FlowControl = module.exports = function (options) {
this.options = options = options || {};
// Buffer queue
this.queue = [];
// Consumer streams
this.pipes = [];
// Down-stream flow-control instances
this.children = [];
// Up-stream flow-control instances
this.parent = options.parent;
// Ensure we don't flush more than once at a time
this.flushing = false;
// determine timeout for flow control delays
if (options.gc) {
var gc = options.gc;
if (gc.getTimeout) {
this.getTimeout = gc.getTimeout;
} else {
// heap size below which we don't bother delaying
var threshold = gc.threshold !== undefined ? gc.threshold : 150000000;
// convert from heapsize to ms timeout
var divisor = gc.divisor !== undefined ? gc.divisor : 500000;
this.getTimeout = function () {
var memory = process.memoryUsage();
var heapSize = memory.heapTotal;
return heapSize < threshold ? 0 : Math.floor(heapSize / divisor);
};
}
} else {
this.getTimeout = null;
}
};
utils.inherits(FlowControl, events.EventEmitter, {
get name() {
return ['FlowControl', this.parent ? 'Child' : 'Root', this.corked ? 'corked' : 'open'].join(' ');
},
get corked() {
// We remain corked while we have children and at least one has data to consume
return this.children.length > 0 && this.children.some(function (child) {
return child.queue && child.queue.length;
});
},
get stem() {
// the decision to stem the incoming data depends on whether the children are corked
// and how many buffers we have backed up
return this.corked || !this.queue || this.queue.length > 2;
},
_write: function _write(dst, data, encoding) {
// Write to a single destination and return a promise
return new PromishLib.Promish(function (resolve, reject) {
dst.write(data, encoding, function (error) {
if (error) {
reject(error);
} else {
resolve();
}
});
});
},
_pipe: function _pipe(chunk) {
var _this = this;
// Write chunk to all pipes. A chunk with no data is the end
var promises = [];
this.pipes.forEach(function (pipe) {
if (chunk.data) {
if (pipe.sync) {
pipe.stream.write(chunk.data, chunk.encoding);
} else {
promises.push(_this._write(pipe.stream, chunk.data, chunk.encoding));
}
} else {
pipe.stream.end();
}
});
if (!promises.length) {
promises.push(PromishLib.Promish.resolve());
}
return PromishLib.Promish.all(promises).then(function () {
try {
chunk.callback();
} catch (e) {
// quietly ignore
}
});
},
_animate: function _animate() {
var count = 0;
var seq = ['|', '/', '-', '\\'];
var cr = '\x1B[0G'; // was '\033[0G'
return setInterval(function () {
process.stdout.write(seq[count++ % 4] + cr);
}, 100);
},
_delay: function _delay() {
var _this2 = this;
// in certain situations it may be useful to delay processing (e.g. for GC)
var timeout = this.getTimeout && this.getTimeout();
if (timeout) {
return new PromishLib.Promish(function (resolve) {
var anime = _this2._animate();
setTimeout(function () {
clearInterval(anime);
resolve();
}, timeout);
});
}
return PromishLib.Promish.resolve();
},
_flush: function _flush() {
var _this3 = this;
// If/while not corked and we have buffers to send, send them
if (this.queue && !this.flushing && !this.corked) {
if (this.queue.length) {
this.flushing = true;
this._delay().then(function () {
return _this3._pipe(_this3.queue.shift());
}).then(function () {
setImmediate(function () {
_this3.flushing = false;
_this3._flush();
});
});
}
if (!this.stem) {
// Signal up-stream that we're ready for more data
this.emit('drain');
}
}
},
write: function write(data, encoding, callback) {
// Called by up-stream pipe
if (encoding instanceof Function) {
callback = encoding;
encoding = 'utf8';
}
callback = callback || utils.nop;
if (!this.queue) {
throw new Error('Cannot write to stream after end');
}
// Always queue chunks and then flush
this.queue.push({
data: data,
encoding: encoding,
callback: callback
});
this._flush();
// restrict further incoming data if we have backed up buffers or
// the children are still busy
var stemFlow = this.corked || this.queue.length > 3;
return !stemFlow;
},
end: function end() {
var _this4 = this;
// Signal from up-stream
this.queue.push({
callback: function callback() {
_this4.queue = null;
_this4.emit('finish');
}
});
this._flush();
},
pipe: function pipe(stream, options) {
options = options || {};
// some streams don't call callbacks
var sync = options.sync || false;
this.pipes.push({
stream: stream,
sync: sync
});
},
unpipe: function unpipe(stream) {
this.pipes = this.pipes.filter(function (pipe) {
return pipe.stream !== stream;
});
},
createChild: function createChild() {
var _this5 = this;
// Create a down-stream flow-control
var options = Object.assign({ parent: this }, this.options);
var child = new FlowControl(options);
this.children.push(child);
child.on('drain', function () {
// a child is ready for more
_this5._flush();
});
child.on('finish', function () {
// One child has finished its stream. Remove it and continue
_this5.children = _this5.children.filter(function (item) {
return item !== child;
});
_this5._flush();
});
return child;
}
});
//# sourceMappingURL=flow-control.js.map