const {EventEmitter} = require('events');

const utils = require('./utils');

// =============================================================================
// FlowControl - Used to slow down streaming to manageable speed
// Implements a subset of Stream.Duplex: pipe() and write()
class FlowControl extends EventEmitter {
  constructor(options) {
    super();

    this.options = options = options || {};

    // Buffer queue
    this.queue = [];

    // Consumer streams
    this.pipes = [];

    // Down-stream flow-control instances
    this.children = [];

    // Up-stream flow-control instances
    this.parent = options.parent;

    // Ensure we don't flush more than once at a time
    this.flushing = false;

    // determine timeout for flow control delays
    if (options.gc) {
      const {gc} = options;
      if (gc.getTimeout) {
        this.getTimeout = gc.getTimeout;
      } else {
        // heap size below which we don't bother delaying
        const threshold = gc.threshold !== undefined ? gc.threshold : 150000000;
        // convert from heapsize to ms timeout
        const divisor = gc.divisor !== undefined ? gc.divisor : 500000;
        this.getTimeout = function() {
          const memory = process.memoryUsage();
          const heapSize = memory.heapTotal;
          return heapSize < threshold ? 0 : Math.floor(heapSize / divisor);
        };
      }
    } else {
      this.getTimeout = null;
    }
  }

  get name() {
    return ['FlowControl', this.parent ? 'Child' : 'Root', this.corked ? 'corked' : 'open'].join(' ');
  }

  get corked() {
    // We remain corked while we have children and at least one has data to consume
    return this.children.length > 0 && this.children.some(child => child.queue && child.queue.length);
  }

  get stem() {
    // the decision to stem the incoming data depends on whether the children are corked
    // and how many buffers we have backed up
    return this.corked || !this.queue || this.queue.length > 2;
  }

  _write(dst, data, encoding) {
    // Write to a single destination and return a promise

    return new Promise((resolve, reject) => {
      dst.write(data, encoding, error => {
        if (error) {
          reject(error);
        } else {
          resolve();
        }
      });
    });
  }

  async _pipe(chunk) {
    // Write chunk to all pipes. A chunk with no data is the end
    const promises = [];
    this.pipes.forEach(pipe => {
      if (chunk.data) {
        if (pipe.sync) {
          pipe.stream.write(chunk.data, chunk.encoding);
        } else {
          promises.push(this._write(pipe.stream, chunk.data, chunk.encoding));
        }
      } else {
        pipe.stream.end();
      }
    });
    if (!promises.length) {
      promises.push(Promise.resolve());
    }
    await Promise.all(promises);
    try {
      chunk.callback();
    } catch (e) {
      // quietly ignore
    }
  }

  _animate() {
    let count = 0;
    const seq = ['|', '/', '-', '\\'];
    const cr = '\u001b[0G'; // was '\033[0G'
    return setInterval(() => {
      process.stdout.write(seq[count++ % 4] + cr);
    }, 100);
  }

  _delay() {
    // in certain situations it may be useful to delay processing (e.g. for GC)
    const timeout = this.getTimeout && this.getTimeout();
    if (timeout) {
      return new Promise(resolve => {
        const anime = this._animate();
        setTimeout(() => {
          clearInterval(anime);
          resolve();
        }, timeout);
      });
    }
    return Promise.resolve();
  }

  async _flush() {
    // If/while not corked and we have buffers to send, send them
    if (this.queue && !this.flushing && !this.corked) {
      if (this.queue.length) {
        this.flushing = true;
        await this._delay();
        await this._pipe(this.queue.shift());
        setImmediate(() => {
          this.flushing = false;
          this._flush();
        });
      }

      if (!this.stem) {
        // Signal up-stream that we're ready for more data
        this.emit('drain');
      }
    }
  }

  write(data, encoding, callback) {
    // Called by up-stream pipe
    if (encoding instanceof Function) {
      callback = encoding;
      encoding = 'utf8';
    }
    callback = callback || utils.nop;

    if (!this.queue) {
      throw new Error('Cannot write to stream after end');
    }

    // Always queue chunks and then flush
    this.queue.push({
      data,
      encoding,
      callback,
    });
    this._flush();

    // restrict further incoming data if we have backed up buffers or
    // the children are still busy
    const stemFlow = this.corked || this.queue.length > 3;
    return !stemFlow;
  }

  end() {
    // Signal from up-stream
    this.queue.push({
      callback: () => {
        this.queue = null;
        this.emit('finish');
      },
    });
    this._flush();
  }

  pipe(stream, options) {
    options = options || {};

    // some streams don't call callbacks
    const sync = options.sync || false;

    this.pipes.push({
      stream,
      sync,
    });
  }

  unpipe(stream) {
    this.pipes = this.pipes.filter(pipe => pipe.stream !== stream);
  }

  createChild() {
    // Create a down-stream flow-control
    const options = {
      parent: this,
      ...this.options,
    };
    const child = new FlowControl(options);
    this.children.push(child);

    child.on('drain', () => {
      // a child is ready for more
      this._flush();
    });

    child.on('finish', () => {
      // One child has finished its stream. Remove it and continue
      this.children = this.children.filter(item => item !== child);
      this._flush();
    });

    return child;
  }
}

module.exports = FlowControl;