//
//
//

'use strict';

var defs = require('./defs');
var constants = defs.constants;
var frame = require('./frame');
var HEARTBEAT = frame.HEARTBEAT;
var Mux = require('./mux').Mux;
var Buffer = require('safe-buffer').Buffer

var Duplex =
  require('stream').Duplex ||
  require('readable-stream/duplex');
var EventEmitter = require('events').EventEmitter;
var Heart = require('./heartbeat').Heart;

var methodName = require('./format').methodName;
var closeMsg = require('./format').closeMessage;
var inspect = require('./format').inspect;

var BitSet = require('./bitset').BitSet;
var inherits = require('util').inherits;
var fmt = require('util').format;
var PassThrough = require('stream').PassThrough ||
  require('readable-stream/passthrough');
var IllegalOperationError = require('./error').IllegalOperationError;
var stackCapture = require('./error').stackCapture;

// High-water mark for channel write buffers, in 'objects' (which are
// encoded frames as buffers).
var DEFAULT_WRITE_HWM = 1024;
// If all the frames of a message (method, properties, content) total
// to less than this, copy them into a single buffer and write it all
// at once. Note that this is less than the minimum frame size: if it
// was greater, we might have to fragment the content.
var SINGLE_CHUNK_THRESHOLD = 2048;

function Connection(underlying) {
  EventEmitter.call( this );
  var stream = this.stream = wrapStream(underlying);
  this.muxer = new Mux(stream);

  // frames
  this.rest = Buffer.alloc(0);
  this.frameMax = constants.FRAME_MIN_SIZE;
  this.sentSinceLastCheck = false;
  this.recvSinceLastCheck = false;

  this.expectSocketClose = false;
  this.freeChannels = new BitSet();
  this.channels = [{channel: {accept: channel0(this)},
                    buffer: underlying}];
}
inherits(Connection, EventEmitter);

var C = Connection.prototype;

// Usual frame accept mode
function mainAccept(frame) {
  var rec = this.channels[frame.channel];
  if (rec) { return rec.channel.accept(frame); }
  // NB CHANNEL_ERROR may not be right, but I don't know what is ..
  else
    this.closeWithError(
      fmt('Frame on unknown channel %d', frame.channel),
      constants.CHANNEL_ERROR,
      new Error(fmt("Frame on unknown channel: %s",
                    inspect(frame, false))));
}

// Handle anything that comes through on channel 0, that's the
// connection control channel. This is only used once mainAccept is
// installed as the frame handler, after the opening handshake.
function channel0(connection) {
  return function(f) {
    // Once we get a 'close', we know 1. we'll get no more frames, and
    // 2. anything we send except close, or close-ok, will be
    // ignored. If we already sent 'close', this won't be invoked since
    // we're already in closing mode; if we didn't well we're not going
    // to send it now are we.
    if (f === HEARTBEAT); // ignore; it's already counted as activity
                          // on the socket, which is its purpose
    else if (f.id === defs.ConnectionClose) {
      // Oh. OK. I guess we're done here then.
      connection.sendMethod(0, defs.ConnectionCloseOk, {});
      var emsg = fmt('Connection closed: %s', closeMsg(f));
      var s = stackCapture(emsg);
      var e = new Error(emsg);
      e.code = f.fields.replyCode;
      if (isFatalError(e)) {
        connection.emit('error', e);
      }
      connection.toClosed(s, e);
    }
    else if (f.id === defs.ConnectionBlocked) {
      connection.emit('blocked', f.fields.reason);
    }
    else if (f.id === defs.ConnectionUnblocked) {
      connection.emit('unblocked');
    }
    else {
      connection.closeWithError(
        fmt("Unexpected frame on channel 0"),
        constants.UNEXPECTED_FRAME,
        new Error(fmt("Unexpected frame on channel 0: %s",
                      inspect(f, false))));
    }
  };
}

// This changed between versions, as did the codec, methods, etc. AMQP
// 0-9-1 is fairly similar to 0.8, but better, and nothing implements
// 0.8 that doesn't implement 0-9-1. In other words, it doesn't make
// much sense to generalise here.
C.sendProtocolHeader = function() {
  this.sendBytes(frame.PROTOCOL_HEADER);
};

/*
  The frighteningly complicated opening protocol (spec section 2.2.4):

     Client -> Server

       protocol header ->
         <- start
       start-ok ->
     .. next two zero or more times ..
         <- secure
       secure-ok ->
         <- tune
       tune-ok ->
       open ->
         <- open-ok

If I'm only supporting SASL's PLAIN mechanism (which I am for the time
being), it gets a bit easier since the server won't in general send
back a `secure`, it'll just send `tune` after the `start-ok`.
(SASL PLAIN: http://tools.ietf.org/html/rfc4616)

*/

C.open = function(allFields, openCallback0) {
  var self = this;
  var openCallback = openCallback0 || function() {};

  // This is where we'll put our negotiated values
  var tunedOptions = Object.create(allFields);

  function wait(k) {
    self.step(function(err, frame) {
      if (err !== null) bail(err);
      else if (frame.channel !== 0) {
        bail(new Error(
          fmt("Frame on channel != 0 during handshake: %s",
              inspect(frame, false))));
      }
      else k(frame);
    });
  }

  function expect(Method, k) {
    wait(function(frame) {
      if (frame.id === Method) k(frame);
      else {
        bail(new Error(
          fmt("Expected %s; got %s",
              methodName(Method), inspect(frame, false))));
      }
    });
  }

  function bail(err) {
    openCallback(err);
  }

  function send(Method) {
    // This can throw an exception if there's some problem with the
    // options; e.g., something is a string instead of a number.
    try { self.sendMethod(0, Method, tunedOptions); }
    catch (err) { bail(err); }
  }

  function negotiate(server, desired) {
    // We get sent values for channelMax, frameMax and heartbeat,
    // which we may accept or lower (subject to a minimum for
    // frameMax, but we'll leave that to the server to enforce). In
    // all cases, `0` really means "no limit", or rather the highest
    // value in the encoding, e.g., unsigned short for channelMax.
    if (server === 0 || desired === 0) {
      // i.e., whichever places a limit, if either
      return Math.max(server, desired);
    }
    else {
      return Math.min(server, desired);
    }
  }

  function onStart(start) {
    var mechanisms = start.fields.mechanisms.toString().split(' ');
    if (mechanisms.indexOf(allFields.mechanism) < 0) {
      bail(new Error(fmt('SASL mechanism %s is not provided by the server',
                         allFields.mechanism)));
      return;
    }
    send(defs.ConnectionStartOk);
    wait(afterStartOk);
  }

  function afterStartOk(reply) {
    switch (reply.id) {
    case defs.ConnectionSecure:
      bail(new Error(
        "Wasn't expecting to have to go through secure"));
      break;
    case defs.ConnectionClose:
      bail(new Error(fmt("Handshake terminated by server: %s",
                         closeMsg(reply))));
      break;
    case defs.ConnectionTune:
      var fields = reply.fields;
      tunedOptions.frameMax =
        negotiate(fields.frameMax, allFields.frameMax);
      tunedOptions.channelMax =
        negotiate(fields.channelMax, allFields.channelMax);
      tunedOptions.heartbeat =
        negotiate(fields.heartbeat, allFields.heartbeat);
      send(defs.ConnectionTuneOk);
      send(defs.ConnectionOpen);
      expect(defs.ConnectionOpenOk, onOpenOk);
      break;
    default:
      bail(new Error(
        fmt("Expected connection.secure, connection.close, " +
            "or connection.tune during handshake; got %s",
            inspect(reply, false))));
      break;
    }
  }

  function onOpenOk(openOk) {
    // Impose the maximum of the encoded value, if the negotiated
    // value is zero, meaning "no, no limits"
    self.channelMax = tunedOptions.channelMax || 0xffff;
    self.frameMax = tunedOptions.frameMax || 0xffffffff;
    // 0 means "no heartbeat", rather than "maximum period of
    // heartbeating"
    self.heartbeat = tunedOptions.heartbeat;
    self.heartbeater = self.startHeartbeater();
    self.accept = mainAccept;
    succeed(openOk);
  }

  // If the server closes the connection, it's probably because of
  // something we did
  function endWhileOpening(err) {
    bail(err || new Error('Socket closed abruptly ' +
                          'during opening handshake'));
  }

  this.stream.on('end', endWhileOpening);
  this.stream.on('error', endWhileOpening);

  function succeed(ok) {
    self.stream.removeListener('end', endWhileOpening);
    self.stream.removeListener('error', endWhileOpening);
    self.stream.on('error', self.onSocketError.bind(self));
    self.stream.on('end', self.onSocketError.bind(
      self, new Error('Unexpected close')));
    self.on('frameError', self.onSocketError.bind(self));
    self.acceptLoop();
    openCallback(null, ok);
  }

  // Now kick off the handshake by prompting the server
  this.sendProtocolHeader();
  expect(defs.ConnectionStart, onStart);
};

// Closing things: AMQP has a closing handshake that applies to
// closing both connects and channels. As the initiating party, I send
// Close, then ignore all frames until I see either CloseOK --
// which signifies that the other party has seen the Close and shut
// the connection or channel down, so it's fine to free resources; or
// Close, which means the other party also wanted to close the
// whatever, and I should send CloseOk so it can free resources,
// then go back to waiting for the CloseOk. If I receive a Close
// out of the blue, I should throw away any unsent frames (they will
// be ignored anyway) and send CloseOk, then clean up resources. In
// general, Close out of the blue signals an error (or a forced
// closure, which may as well be an error).
//
//  RUNNING [1] --- send Close ---> Closing [2] ---> recv Close --+
//     |                               |                         [3]
//     |                               +------ send CloseOk ------+
//  recv Close                   recv CloseOk
//     |                               |
//     V                               V
//  Ended [4] ---- send CloseOk ---> Closed [5]
//
// [1] All frames accepted; getting a Close frame from the server
// moves to Ended; client may initiate a close by sending Close
// itself.
// [2] Client has initiated a close; only CloseOk or (simulataneously
// sent) Close is accepted.
// [3] Simultaneous close
// [4] Server won't send any more frames; accept no more frames, send
// CloseOk.
// [5] Fully closed, client will send no more, server will send no
// more. Signal 'close' or 'error'.
//
// There are two signalling mechanisms used in the API. The first is
// that calling `close` will return a promise, that will either
// resolve once the connection or channel is cleanly shut down, or
// will reject if the shutdown times out.
//
// The second is the 'close' and 'error' events. These are
// emitted as above. The events will fire *before* promises are
// resolved.

// Close the connection without even giving a reason. Typical.
C.close = function(closeCallback) {
  var k = closeCallback && function() { closeCallback(null); };
  this.closeBecause("Cheers, thanks", constants.REPLY_SUCCESS, k);
};

// Close with a reason and a 'code'. I'm pretty sure RabbitMQ totally
// ignores these; maybe it logs them. The continuation will be invoked
// when the CloseOk has been received, and before the 'close' event.
C.closeBecause = function(reason, code, k) {
  this.sendMethod(0, defs.ConnectionClose, {
    replyText: reason,
    replyCode: code,
    methodId: 0, classId: 0
  });
  var s = stackCapture('closeBecause called: ' + reason);
  this.toClosing(s, k);
};

C.closeWithError = function(reason, code, error) {
  this.emit('error', error);
  this.closeBecause(reason, code);
};

C.onSocketError = function(err) {
  if (!this.expectSocketClose) {
    // forestall any more calls to onSocketError, since we're signed
    // up for `'error'` *and* `'end'`
    this.expectSocketClose = true;
    this.emit('error', err);
    var s = stackCapture('Socket error');
    this.toClosed(s, err);
  }
};

function invalidOp(msg, stack) {
  return function() {
    throw new IllegalOperationError(msg, stack);
  };
}

function invalidateSend(conn, msg, stack) {
  conn.sendMethod = conn.sendContent = conn.sendMessage =
    invalidOp(msg, stack);
}

// A close has been initiated. Repeat: a close has been initiated.
// This means we should not send more frames, anyway they will be
// ignored. We also have to shut down all the channels.
C.toClosing = function(capturedStack, k) {
  var send = this.sendMethod.bind(this);

  this.accept = function(f) {
    if (f.id === defs.ConnectionCloseOk) {
      if (k) k();
      var s = stackCapture('ConnectionCloseOk received');
      this.toClosed(s, undefined);
    }
    else if (f.id === defs.ConnectionClose) {
      send(0, defs.ConnectionCloseOk, {});
    }
    // else ignore frame
  };
  invalidateSend(this, 'Connection closing', capturedStack);
};

C._closeChannels = function(capturedStack) {
  for (var i = 1; i < this.channels.length; i++) {
    var ch = this.channels[i];
    if (ch !== null) {
      ch.channel.toClosed(capturedStack); // %%% or with an error? not clear
    }
  }
};

// A close has been confirmed. Cease all communication.
C.toClosed = function(capturedStack, maybeErr) {
  this._closeChannels(capturedStack);
  var info = fmt('Connection closed (%s)',
                 (maybeErr) ? maybeErr.toString() : 'by client');
  // Tidy up, invalidate enverything, dynamite the bridges.
  invalidateSend(this, info, capturedStack);
  this.accept = invalidOp(info, capturedStack);
  this.close = function(cb) {
    cb && cb(new IllegalOperationError(info, capturedStack));
  };
  if (this.heartbeater) this.heartbeater.clear();
  // This is certainly true now, if it wasn't before
  this.expectSocketClose = true;
  this.stream.end();
  this.emit('close', maybeErr);
};

// ===

C.startHeartbeater = function() {
  if (this.heartbeat === 0) return null;
  else {
    var self = this;
    var hb = new Heart(this.heartbeat,
                       this.checkSend.bind(this),
                       this.checkRecv.bind(this));
    hb.on('timeout', function() {
      var hberr = new Error("Heartbeat timeout");
      self.emit('error', hberr);
      var s = stackCapture('Heartbeat timeout');
      self.toClosed(s, hberr);
    });
    hb.on('beat', function() {
      self.sendHeartbeat();
    });
    return hb;
  }
};

// I use an array to keep track of the channels, rather than an
// object. The channel identifiers are numbers, and allocated by the
// connection. If I try to allocate low numbers when they are
// available (which I do, by looking from the start of the bitset),
// this ought to keep the array small, and out of 'sparse array
// storage'. I also set entries to null, rather than deleting them, in
// the expectation that the next channel allocation will fill the slot
// again rather than growing the array. See
// http://www.html5rocks.com/en/tutorials/speed/v8/
C.freshChannel = function(channel, options) {
  var next = this.freeChannels.nextClearBit(1);
  if (next < 0 || next > this.channelMax)
    throw new Error("No channels left to allocate");
  this.freeChannels.set(next);

  var hwm = (options && options.highWaterMark) || DEFAULT_WRITE_HWM;
  var writeBuffer = new PassThrough({
    objectMode: true, highWaterMark: hwm
  });
  this.channels[next] = {channel: channel, buffer: writeBuffer};
  writeBuffer.on('drain', function() {
    channel.onBufferDrain();
  });
  this.muxer.pipeFrom(writeBuffer);
  return next;
};

C.releaseChannel = function(channel) {
  this.freeChannels.clear(channel);
  var buffer = this.channels[channel].buffer;
  this.muxer.unpipeFrom(buffer);
  this.channels[channel] = null;
};

C.acceptLoop = function() {
  var self = this;

  function go() {
    try {
      var f; while (f = self.recvFrame()) self.accept(f);
    }
    catch (e) {
      self.emit('frameError', e);
    }
  }
  self.stream.on('readable', go);
  go();
};

C.step = function(cb) {
  var self = this;
  function recv() {
    var f;
    try {
      f = self.recvFrame();
    }
    catch (e) {
      cb(e, null);
      return;
    }
    if (f) cb(null, f);
    else self.stream.once('readable', recv);
  }
  recv();
};

C.checkSend = function() {
  var check = this.sentSinceLastCheck;
  this.sentSinceLastCheck = false;
  return check;
}

C.checkRecv = function() {
  var check = this.recvSinceLastCheck;
  this.recvSinceLastCheck = false;
  return check;
}

C.sendBytes = function(bytes) {
  this.sentSinceLastCheck = true;
  this.stream.write(bytes);
};

C.sendHeartbeat = function() {
  return this.sendBytes(frame.HEARTBEAT_BUF);
};

var encodeMethod = defs.encodeMethod;
var encodeProperties = defs.encodeProperties;

C.sendMethod = function(channel, Method, fields) {
  var frame = encodeMethod(Method, channel, fields);
  this.sentSinceLastCheck = true;
  var buffer = this.channels[channel].buffer;
  return buffer.write(frame);
};

C.sendMessage = function(channel,
                         Method, fields,
                         Properties, props,
                         content) {
  if (!Buffer.isBuffer(content))
    throw new TypeError('content is not a buffer');

  var mframe = encodeMethod(Method, channel, fields);
  var pframe = encodeProperties(Properties, channel,
                                content.length, props);
  var buffer = this.channels[channel].buffer;
  this.sentSinceLastCheck = true;

  var methodHeaderLen = mframe.length + pframe.length;
  var bodyLen = (content.length > 0) ?
    content.length + FRAME_OVERHEAD : 0;
  var allLen = methodHeaderLen + bodyLen;

  if (allLen < SINGLE_CHUNK_THRESHOLD) {
    var all = Buffer.alloc(allLen);
    var offset = mframe.copy(all, 0);
    offset += pframe.copy(all, offset);

    if (bodyLen > 0)
      makeBodyFrame(channel, content).copy(all, offset);
    return buffer.write(all);
  }
  else {
    if (methodHeaderLen < SINGLE_CHUNK_THRESHOLD) {
      var both = Buffer.alloc(methodHeaderLen);
      var offset = mframe.copy(both, 0);
      pframe.copy(both, offset);
      buffer.write(both);
    }
    else {
      buffer.write(mframe);
      buffer.write(pframe);
    }
    return this.sendContent(channel, content);
  }
};

var FRAME_OVERHEAD = defs.FRAME_OVERHEAD;
var makeBodyFrame = frame.makeBodyFrame;

C.sendContent = function(channel, body) {
  if (!Buffer.isBuffer(body)) {
    throw new TypeError(fmt("Expected buffer; got %s", body));
  }
  var writeResult = true;
  var buffer = this.channels[channel].buffer;

  var maxBody = this.frameMax - FRAME_OVERHEAD;

  for (var offset = 0; offset < body.length; offset += maxBody) {
    var end = offset + maxBody;
    var slice = (end > body.length) ? body.slice(offset) : body.slice(offset, end);
    var bodyFrame = makeBodyFrame(channel, slice);
    writeResult = buffer.write(bodyFrame);
  }
  this.sentSinceLastCheck = true;
  return writeResult;
};

var parseFrame = frame.parseFrame;
var decodeFrame = frame.decodeFrame;

C.recvFrame = function() {
  // %%% identifying invariants might help here?
  var frame = parseFrame(this.rest, this.frameMax);

  if (!frame) {
    var incoming = this.stream.read();
    if (incoming === null) {
      return false;
    }
    else {
      this.recvSinceLastCheck = true;
      this.rest = Buffer.concat([this.rest, incoming]);
      return this.recvFrame();
    }
  }
  else {
    this.rest = frame.rest;
    return decodeFrame(frame);
  }
};

function wrapStream(s) {
  if (s instanceof Duplex) return s;
  else {
    var ws = new Duplex();
    ws.wrap(s); //wraps the readable side of things
    ws._write = function(chunk, encoding, callback) {
      return s.write(chunk, encoding, callback);
    };
    return ws;
  }
}

function isFatalError(error) {
  switch (error && error.code) {
  case defs.constants.CONNECTION_FORCED:
  case defs.constants.REPLY_SUCCESS:
    return false;
  default:
    return true;
  }
}

module.exports.Connection = Connection;
module.exports.isFatalError = isFatalError;