var extended = require("../extended"),
    isUndefined = extended.isUndefined,
    spreadArgs = extended.spreadArgs,
    util = require("util"),
    out = process.stdout,
    stream = require("stream"),
    EMPTY = /^\s*(?:''|"")?\s*(?:,\s*(?:''|"")?\s*)*$/,
    DEFAULT_DELIMITER = ",",
    createParser = require("./parser"),
    fs = require("fs"),
    StringDecoder = require('string_decoder').StringDecoder,
    hasIsPaused = !!stream.Transform.prototype.isPaused;

function ParserStream(options) {
    options = options || {};
    options.objectMode = extended.has(options, "objectMode") ? options.objectMode : true;
    stream.Transform.call(this, options);
    this.lines = "";
    this.decoder = new StringDecoder();
    this._parsedHeaders = false;
    this._rowCount = -1;
    this._emitData = false;
    var delimiter;
    if (extended.has(options, "delimiter")) {
        delimiter = options.delimiter;
        if (delimiter.length > 1) {
            throw new Error("delimiter option must be one character long");
        }
        delimiter = extended.escape(delimiter);
    } else {
        delimiter = DEFAULT_DELIMITER;
    }
    options.delimiter = delimiter;
    this.parser = createParser(options);
    this._headers = options.headers;
    this._renameHeaders = options.renameHeaders;
    this._ignoreEmpty = options.ignoreEmpty;
    this._discardUnmappedColumns = options.discardUnmappedColumns;
    this._strictColumnHandling = options.strictColumnHandling;
    this.__objectMode = options.objectMode;
    this.__buffered = [];
    return this;
}

util.inherits(ParserStream, stream.Transform);

var origOn = ParserStream.prototype.on,
    origEmit = ParserStream.prototype.emit;


extended(ParserStream).extend({

    __pausedDone: null,

    __endEmitted: false,

    __emittedData: false,

    __handleLine: function __parseLineData(line, index, ignore, next) {
        var ignoreEmpty = this._ignoreEmpty, self = this;
        if (extended.isBoolean(ignoreEmpty) && ignoreEmpty && (!line || EMPTY.test(line.join("")))) {
            return next(null, null);
        }
        if (!ignore) {
            this.__transform(line, function (err, line) {
                if (err) {
                    next(err);
                } else {
                    self.__validate(line, function (err, isValid, reason) {
                        if (err) {
                            next(err);
                        } else if (isValid) {
                            next(null, line);
                        } else {
                            self.emit("data-invalid", line, index, reason);
                            next(null, null);
                        }
                    });
                }
            });
        } else {
            return next(null, line);
        }
    },

    __processRows: function (rows, data, cb) {
        var self = this, count;
        extended.asyncEach(rows, function (row, cb) {
            if (row) {
                self.__handleLine(row, (count = ++self._rowCount), false, function (err, dataRow) {
                    if (err) {
                        cb(err);
                    } else {
                        if (dataRow) {
                            if (!self.isStreamPaused()) {
                                self.__emitRecord(dataRow, count);
                            } else {
                                self.__buffered.push([dataRow, count]);
                            }
                        } else {
                            count = --self._rowCount;
                        }
                        cb();
                    }
                });
            }
        }, function (err) {
            if (err) {
                cb(err);
            } else {
                cb(null, data.line);
            }
        });
    },

    __processHeaders: function (rows, cb) {
        var headers = this._headers,
            renameHeaders = this._renameHeaders,
            discardUnmappedColumns = this._discardUnmappedColumns,
            strictColumnHandling = this._strictColumnHandling,
            self = this;

        function headerHandler(err, headers) {
            if (err) {
                cb(err);
            } else if (extended.isArray(headers)) {
                var headersLength = headers.length,
                    orig = self.__transform;
                self.__transform = function (data, cb) {
                    var ret = {}, i = -1, val;
                    if (data.length > headersLength) {
                        if (discardUnmappedColumns) {
                            data.splice(headersLength);
                        } else if (strictColumnHandling) {
                            self.emit("data-invalid", data);
                            return orig(null, cb);
                        } else {
                            self.emit("error", new Error("Unexpected Error: column header mismatch expected: " + headersLength + " columns got: " + data.length));
                            return orig(null, cb);
                        }
                    } else if (strictColumnHandling && (data.length < headersLength)) {
                        self.emit("data-invalid", data);
                        return orig(null, cb);
                    }
                    while (++i < headersLength) {
                        if (isUndefined(headers[i])) {
                            continue;
                        }
                        val = data[i];
                        ret[headers[i]] = isUndefined(val) ? '' : val;
                    }

                    return orig(ret, cb);
                };
            }
            self._parsedHeaders = true;
            cb(null);
        }

        if (renameHeaders) {
            if (Array.isArray(headers)) {
                rows.shift();
                headerHandler(null, headers);
            } else {
                self.emit("error", new Error("Error renaming headers: new headers must be provided in an array"));
            }
        } else if (extended.isBoolean(headers) && headers) {
            this.__handleLine(rows.shift(), 0, true, headerHandler);
        } else {
            headerHandler(null, headers);
        }

    },

    _parse: function _parseLine(data, hasMoreData, cb) {
        var rows, self = this;
        try {
            data = this.parser(data, hasMoreData);
            rows = data.rows;
            if (rows.length) {
                if (!this._parsedHeaders) {
                    this.__processHeaders(rows, function (err) {
                        if (err) {
                            cb(err);
                        } else {
                            self.__processRows(rows, data, cb);
                        }
                    });
                } else {
                    this.__processRows(rows, data, cb);
                }
            } else {
                cb(null, data.line);
            }
        } catch (e) {
            cb(e);
        }
    },

    __emitRecord: function (dataRow, count) {
        if (this._emitData) {
            this.push(this.__objectMode ? dataRow : JSON.stringify(dataRow));
        }
    },

    __removeBOM: function (data) {
        // Catches EFBBBF (UTF-8 BOM) because the buffer-to-string
        // conversion translates it to FEFF (UTF-16 BOM)
        if (data && typeof data == 'string' && data.charCodeAt(0) == '0xFEFF') {
            return data.slice(1);
        }
        return data;
    },

    _transform: function (data, encoding, done) {
        var lines = this.lines,
            lineData = (lines + this.decoder.write(data)),
            self = this;
        if (lineData.length > 1) {
            lineData = this.__removeBOM(lineData);
            this._parse(lineData, true, function (err, lineData) {
                if (err) {
                    done(err);
                } else {
                    self.lines = lineData;
                    if (!self.isStreamPaused()) {
                        done();
                    } else {
                        self.__pausedDone = done;
                    }
                }
            });
        } else {
            this.lines = lineData;
            if (!this.isStreamPaused()) {
                done();
            } else {
                this.__pausedDone = done;
            }
        }

    },

    __doFlush: function (callback) {
        try {
            callback();
        } catch (e) {
            callback(e);
        }
    },

    _flush: function (callback) {
        var self = this;
        if (this.lines) {
            this._parse(this.lines, false, function (err) {
                if (err) {
                    callback(err);
                } else if (!self.isStreamPaused()) {
                    self.__doFlush(callback);
                } else {
                    self.__pausedDone = function () {
                        self.__doFlush(callback);
                    };
                }
            });
        } else {
            if (!this.isStreamPaused()) {
                this.__doFlush(callback);
            } else {
                this.__pausedDone = function () {
                    self.__doFlush(callback);
                };
            }
        }
    },

    __validate: function (data, next) {
        return next(null, true);
    },

    __transform: function (data, next) {
        return next(null, data);
    },

    __flushPausedBuffer: function () {
        var buffered = this.__buffered, l = buffered.length;
        if (l) {
            var entry;
            while (buffered.length) {
                entry = buffered.shift();
                this.__emitRecord(entry[0], entry[1]);
                //handle case where paused is called while emitting data
                if (this.isStreamPaused()) {
                    return;
                }
            }
            buffered.length = 0;
        }
        if (this.__pausedDone) {
            var done = this.__pausedDone;
            this.__pausedDone = null;
            done();
        }
    },

    isStreamPaused: function () {
        return this.__paused;
    },

    emit: function (event) {
        if (event === "end") {
            if (!this.__endEmitted) {
                this.__endEmitted = true;
                spreadArgs(origEmit, ["end", ++this._rowCount], this);
            }
        } else {
            if (!hasIsPaused) {
                if (event === "pause") {
                    this.__paused = true;
                } else if (event === "resume") {
                    this.__paused = false;
                    this.__flushPausedBuffer();
                }
            }
            spreadArgs(origEmit, arguments, this);
        }
    },

    on: function (evt) {
        if (evt === "data" || evt === "readable") {
            this._emitData = true;
        }
        spreadArgs(origOn, arguments, this);
        return this;
    },

    validate: function (cb) {
        if (!extended.isFunction(cb)) {
            this.emit("error", new TypeError("fast-csv.Parser#validate requires a function"));
        }
        if (cb.length === 2) {
            this.__validate = cb;
        } else {
            this.__validate = function (data, next) {
                return next(null, cb(data));
            };
        }
        return this;
    },
    transform: function (cb) {
        if (!extended.isFunction(cb)) {
            this.emit("error", new TypeError("fast-csv.Parser#transform requires a function"));
        }
        if (cb.length === 2) {
            this.__transform = cb;
        } else {
            this.__transform = function (data, next) {
                return next(null, cb(data));
            };
        }
        return this;
    }

});

module.exports = ParserStream;