"use strict";

function _typeof(obj) { if (typeof Symbol === "function" && typeof Symbol.iterator === "symbol") { _typeof = function _typeof(obj) { return typeof obj; }; } else { _typeof = function _typeof(obj) { return obj && typeof Symbol === "function" && obj.constructor === Symbol && obj !== Symbol.prototype ? "symbol" : typeof obj; }; } return _typeof(obj); }

function _classCallCheck(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new TypeError("Cannot call a class as a function"); } }

function _defineProperties(target, props) { for (var i = 0; i < props.length; i++) { var descriptor = props[i]; descriptor.enumerable = descriptor.enumerable || false; descriptor.configurable = true; if ("value" in descriptor) descriptor.writable = true; Object.defineProperty(target, descriptor.key, descriptor); } }

function _createClass(Constructor, protoProps, staticProps) { if (protoProps) _defineProperties(Constructor.prototype, protoProps); if (staticProps) _defineProperties(Constructor, staticProps); return Constructor; }

function _possibleConstructorReturn(self, call) { if (call && (_typeof(call) === "object" || typeof call === "function")) { return call; } return _assertThisInitialized(self); }

function _assertThisInitialized(self) { if (self === void 0) { throw new ReferenceError("this hasn't been initialised - super() hasn't been called"); } return self; }

function _getPrototypeOf(o) { _getPrototypeOf = Object.setPrototypeOf ? Object.getPrototypeOf : function _getPrototypeOf(o) { return o.__proto__ || Object.getPrototypeOf(o); }; return _getPrototypeOf(o); }

function _inherits(subClass, superClass) { if (typeof superClass !== "function" && superClass !== null) { throw new TypeError("Super expression must either be null or a function"); } subClass.prototype = Object.create(superClass && superClass.prototype, { constructor: { value: subClass, writable: true, configurable: true } }); if (superClass) _setPrototypeOf(subClass, superClass); }

function _setPrototypeOf(o, p) { _setPrototypeOf = Object.setPrototypeOf || function _setPrototypeOf(o, p) { o.__proto__ = p; return o; }; return _setPrototypeOf(o, p); }

var events = require('events');

var PromiseLib = require('./promise');

var utils = require('./utils'); // =============================================================================
// FlowControl - Used to slow down streaming to manageable speed
// Implements a subset of Stream.Duplex: pipe() and write()


var FlowControl =
/*#__PURE__*/
function (_events$EventEmitter) {
  _inherits(FlowControl, _events$EventEmitter);

  function FlowControl(options) {
    var _this;

    _classCallCheck(this, FlowControl);

    _this = _possibleConstructorReturn(this, _getPrototypeOf(FlowControl).call(this));
    _this.options = options = options || {}; // Buffer queue

    _this.queue = []; // Consumer streams

    _this.pipes = []; // Down-stream flow-control instances

    _this.children = []; // Up-stream flow-control instances

    _this.parent = options.parent; // Ensure we don't flush more than once at a time

    _this.flushing = false; // determine timeout for flow control delays

    if (options.gc) {
      var _options = options,
          gc = _options.gc;

      if (gc.getTimeout) {
        _this.getTimeout = gc.getTimeout;
      } else {
        // heap size below which we don't bother delaying
        var threshold = gc.threshold !== undefined ? gc.threshold : 150000000; // convert from heapsize to ms timeout

        var divisor = gc.divisor !== undefined ? gc.divisor : 500000;

        _this.getTimeout = function () {
          var memory = process.memoryUsage();
          var heapSize = memory.heapTotal;
          return heapSize < threshold ? 0 : Math.floor(heapSize / divisor);
        };
      }
    } else {
      _this.getTimeout = null;
    }

    return _this;
  }

  _createClass(FlowControl, [{
    key: "_write",
    value: function _write(dst, data, encoding) {
      // Write to a single destination and return a promise
      return new PromiseLib.Promise(function (resolve, reject) {
        dst.write(data, encoding, function (error) {
          if (error) {
            reject(error);
          } else {
            resolve();
          }
        });
      });
    }
  }, {
    key: "_pipe",
    value: function _pipe(chunk) {
      var _this2 = this;

      // Write chunk to all pipes. A chunk with no data is the end
      var promises = [];
      this.pipes.forEach(function (pipe) {
        if (chunk.data) {
          if (pipe.sync) {
            pipe.stream.write(chunk.data, chunk.encoding);
          } else {
            promises.push(_this2._write(pipe.stream, chunk.data, chunk.encoding));
          }
        } else {
          pipe.stream.end();
        }
      });

      if (!promises.length) {
        promises.push(PromiseLib.Promise.resolve());
      }

      return PromiseLib.Promise.all(promises).then(function () {
        try {
          chunk.callback();
        } catch (e) {// quietly ignore
        }
      });
    }
  }, {
    key: "_animate",
    value: function _animate() {
      var count = 0;
      var seq = ['|', '/', '-', '\\'];
      var cr = "\x1B[0G"; // was '\033[0G'

      return setInterval(function () {
        process.stdout.write(seq[count++ % 4] + cr);
      }, 100);
    }
  }, {
    key: "_delay",
    value: function _delay() {
      var _this3 = this;

      // in certain situations it may be useful to delay processing (e.g. for GC)
      var timeout = this.getTimeout && this.getTimeout();

      if (timeout) {
        return new PromiseLib.Promise(function (resolve) {
          var anime = _this3._animate();

          setTimeout(function () {
            clearInterval(anime);
            resolve();
          }, timeout);
        });
      }

      return PromiseLib.Promise.resolve();
    }
  }, {
    key: "_flush",
    value: function _flush() {
      var _this4 = this;

      // If/while not corked and we have buffers to send, send them
      if (this.queue && !this.flushing && !this.corked) {
        if (this.queue.length) {
          this.flushing = true;

          this._delay().then(function () {
            return _this4._pipe(_this4.queue.shift());
          }).then(function () {
            setImmediate(function () {
              _this4.flushing = false;

              _this4._flush();
            });
          });
        }

        if (!this.stem) {
          // Signal up-stream that we're ready for more data
          this.emit('drain');
        }
      }
    }
  }, {
    key: "write",
    value: function write(data, encoding, callback) {
      // Called by up-stream pipe
      if (encoding instanceof Function) {
        callback = encoding;
        encoding = 'utf8';
      }

      callback = callback || utils.nop;

      if (!this.queue) {
        throw new Error('Cannot write to stream after end');
      } // Always queue chunks and then flush


      this.queue.push({
        data: data,
        encoding: encoding,
        callback: callback
      });

      this._flush(); // restrict further incoming data if we have backed up buffers or
      // the children are still busy


      var stemFlow = this.corked || this.queue.length > 3;
      return !stemFlow;
    }
  }, {
    key: "end",
    value: function end() {
      var _this5 = this;

      // Signal from up-stream
      this.queue.push({
        callback: function callback() {
          _this5.queue = null;

          _this5.emit('finish');
        }
      });

      this._flush();
    }
  }, {
    key: "pipe",
    value: function pipe(stream, options) {
      options = options || {}; // some streams don't call callbacks

      var sync = options.sync || false;
      this.pipes.push({
        stream: stream,
        sync: sync
      });
    }
  }, {
    key: "unpipe",
    value: function unpipe(stream) {
      this.pipes = this.pipes.filter(function (pipe) {
        return pipe.stream !== stream;
      });
    }
  }, {
    key: "createChild",
    value: function createChild() {
      var _this6 = this;

      // Create a down-stream flow-control
      var options = Object.assign({
        parent: this
      }, this.options);
      var child = new FlowControl(options);
      this.children.push(child);
      child.on('drain', function () {
        // a child is ready for more
        _this6._flush();
      });
      child.on('finish', function () {
        // One child has finished its stream. Remove it and continue
        _this6.children = _this6.children.filter(function (item) {
          return item !== child;
        });

        _this6._flush();
      });
      return child;
    }
  }, {
    key: "name",
    get: function get() {
      return ['FlowControl', this.parent ? 'Child' : 'Root', this.corked ? 'corked' : 'open'].join(' ');
    }
  }, {
    key: "corked",
    get: function get() {
      // We remain corked while we have children and at least one has data to consume
      return this.children.length > 0 && this.children.some(function (child) {
        return child.queue && child.queue.length;
      });
    }
  }, {
    key: "stem",
    get: function get() {
      // the decision to stem the incoming data depends on whether the children are corked
      // and how many buffers we have backed up
      return this.corked || !this.queue || this.queue.length > 2;
    }
  }]);

  return FlowControl;
}(events.EventEmitter);

module.exports = FlowControl;
//# sourceMappingURL=flow-control.js.map