From 4b63a2cfb2c738ebdb525221ad042c67ab409cd3 Mon Sep 17 00:00:00 2001 From: TheThing Date: Thu, 9 Nov 2023 11:21:38 +0000 Subject: [PATCH] Very basic and workable HLS ingestion server for ffmpeg --- .npmrc | 1 + api/formidable/file.js | 81 ++++ api/formidable/incoming_form.js | 580 +++++++++++++++++++++++++++ api/formidable/index.js | 3 + api/formidable/json_parser.js | 30 ++ api/formidable/multipart_parser.js | 332 +++++++++++++++ api/formidable/octet_parser.js | 20 + api/formidable/querystring_parser.js | 27 ++ api/ingest.mjs | 81 ++++ api/server.mjs | 96 +++++ api/static_routes.mjs | 16 + index.mjs | 35 ++ package.json | 34 ++ upload/.gitkeep | 0 14 files changed, 1336 insertions(+) create mode 100644 .npmrc create mode 100644 api/formidable/file.js create mode 100644 api/formidable/incoming_form.js create mode 100644 api/formidable/index.js create mode 100644 api/formidable/json_parser.js create mode 100644 api/formidable/multipart_parser.js create mode 100644 api/formidable/octet_parser.js create mode 100644 api/formidable/querystring_parser.js create mode 100644 api/ingest.mjs create mode 100644 api/server.mjs create mode 100644 api/static_routes.mjs create mode 100644 index.mjs create mode 100644 package.json create mode 100644 upload/.gitkeep diff --git a/.npmrc b/.npmrc new file mode 100644 index 0000000..9cf9495 --- /dev/null +++ b/.npmrc @@ -0,0 +1 @@ +package-lock=false \ No newline at end of file diff --git a/api/formidable/file.js b/api/formidable/file.js new file mode 100644 index 0000000..50d34c0 --- /dev/null +++ b/api/formidable/file.js @@ -0,0 +1,81 @@ +if (global.GENTLY) require = GENTLY.hijack(require); + +var util = require('util'), + fs = require('fs'), + EventEmitter = require('events').EventEmitter, + crypto = require('crypto'); + +function File(properties) { + EventEmitter.call(this); + + this.size = 0; + this.path = null; + this.name = null; + this.type = null; + this.hash = null; + this.lastModifiedDate = null; + + this._writeStream = null; + + for (var key in properties) { + this[key] = properties[key]; + } + + if(typeof this.hash === 'string') { + this.hash = crypto.createHash(properties.hash); + } else { + this.hash = null; + } +} +module.exports = File; +util.inherits(File, EventEmitter); + +File.prototype.open = function() { + this._writeStream = new fs.WriteStream(this.path); +}; + +File.prototype.toJSON = function() { + var json = { + size: this.size, + path: this.path, + name: this.name, + type: this.type, + mtime: this.lastModifiedDate, + length: this.length, + filename: this.filename, + mime: this.mime + }; + if (this.hash && this.hash != "") { + json.hash = this.hash; + } + return json; +}; + +File.prototype.write = function(buffer, cb) { + var self = this; + if (self.hash) { + self.hash.update(buffer); + } + + if (this._writeStream.closed) { + return cb(); + } + + this._writeStream.write(buffer, function() { + self.lastModifiedDate = new Date(); + self.size += buffer.length; + self.emit('progress', self.size); + cb(); + }); +}; + +File.prototype.end = function(cb) { + var self = this; + if (self.hash) { + self.hash = self.hash.digest('hex'); + } + this._writeStream.end(function() { + self.emit('end'); + cb(); + }); +}; diff --git a/api/formidable/incoming_form.js b/api/formidable/incoming_form.js new file mode 100644 index 0000000..9e218e2 --- /dev/null +++ b/api/formidable/incoming_form.js @@ -0,0 +1,580 @@ +if (global.GENTLY) require = GENTLY.hijack(require); + +var crypto = require('crypto'); +var fs = require('fs'); +var util = require('util'), + path = require('path'), + File = require('./file'), + MultipartParser = require('./multipart_parser').MultipartParser, + QuerystringParser = require('./querystring_parser').QuerystringParser, + OctetParser = require('./octet_parser').OctetParser, + JSONParser = require('./json_parser').JSONParser, + StringDecoder = require('string_decoder').StringDecoder, + EventEmitter = require('events').EventEmitter, + Stream = require('stream').Stream, + os = require('os'); + +function IncomingForm(opts) { + if (!(this instanceof IncomingForm)) return new IncomingForm(opts); + EventEmitter.call(this); + + opts=opts||{}; + + this.error = null; + this.ended = false; + + this.maxFields = opts.maxFields || 1000; + this.maxFieldsSize = opts.maxFieldsSize || 20 * 1024 * 1024; + this.maxFileSize = opts.maxFileSize || 200 * 1024 * 1024; + this.keepExtensions = opts.keepExtensions || false; + this.uploadDir = opts.uploadDir || (os.tmpdir && os.tmpdir()) || os.tmpDir(); + this.encoding = opts.encoding || 'utf-8'; + this.headers = null; + this.type = null; + this.hash = opts.hash || false; + this.multiples = opts.multiples || false; + + this.bytesReceived = null; + this.bytesExpected = null; + + this._parser = null; + this._flushing = 0; + this._fieldsSize = 0; + this._fileSize = 0; + this.openedFiles = []; + + return this; +} +util.inherits(IncomingForm, EventEmitter); +exports.IncomingForm = IncomingForm; + +IncomingForm.prototype.parse = function(req, cb) { + this.pause = function() { + try { + req.pause(); + } catch (err) { + // the stream was destroyed + if (!this.ended) { + // before it was completed, crash & burn + this._error(err); + } + return false; + } + return true; + }; + + this.resume = function() { + try { + req.resume(); + } catch (err) { + // the stream was destroyed + if (!this.ended) { + // before it was completed, crash & burn + this._error(err); + } + return false; + } + + return true; + }; + + // Setup callback first, so we don't miss anything from data events emitted + // immediately. + if (cb) { + var fields = {}, files = {}; + this + .on('field', function(name, value) { + fields[name] = value; + }) + .on('file', function(name, file) { + if (this.multiples) { + if (files[name]) { + if (!Array.isArray(files[name])) { + files[name] = [files[name]]; + } + files[name].push(file); + } else { + files[name] = file; + } + } else { + files[name] = file; + } + }) + .on('error', function(err) { + cb(err, fields, files); + }) + .on('end', function() { + cb(null, fields, files); + }); + } + + // Parse headers and setup the parser, ready to start listening for data. + this.writeHeaders(req.headers); + + // Start listening for data. + var self = this; + req + .on('error', function(err) { + if (err.message === 'aborted' && self.headers['transfer-encoding'] === 'chunked') { + return; + } + self._error(err); + }) + .on('aborted', function() { + if (self.headers['transfer-encoding'] === 'chunked') { + var err = self._parser.end(); + if (err) { + self._error(err); + } + return; + } + + self.emit('aborted'); + self._error(new Error('Request aborted')); + }) + .on('data', function(buffer) { + self.write(buffer); + }) + .on('end', function() { + if (self.error) { + return; + } + + var err = self._parser.end(); + if (err) { + self._error(err); + } + }); + + return this; +}; + +IncomingForm.prototype.writeHeaders = function(headers) { + this.headers = headers; + this._parseContentLength(); + this._parseContentType(); +}; + +IncomingForm.prototype.write = function(buffer) { + if (this.error) { + return; + } + if (!this._parser) { + this._error(new Error('uninitialized parser')); + return; + } + if (typeof this._parser.write !== 'function') { + this._error(new Error('did not expect data')); + return; + } + + this.bytesReceived += buffer.length; + this.emit('progress', this.bytesReceived, this.bytesExpected); + + var bytesParsed = this._parser.write(buffer); + if (bytesParsed !== buffer.length) { + this._error(new Error('parser error, '+bytesParsed+' of '+buffer.length+' bytes parsed')); + } + + return bytesParsed; +}; + +IncomingForm.prototype.pause = function() { + // this does nothing, unless overwritten in IncomingForm.parse + return false; +}; + +IncomingForm.prototype.resume = function() { + // this does nothing, unless overwritten in IncomingForm.parse + return false; +}; + +IncomingForm.prototype.onPart = function(part) { + // this method can be overwritten by the user + this.handlePart(part); +}; + +IncomingForm.prototype.handlePart = function(part) { + var self = this; + + // This MUST check exactly for undefined. You can not change it to !part.filename. + if (part.filename === undefined) { + var value = '' + , decoder = new StringDecoder(this.encoding); + + part.on('data', function(buffer) { + self._fieldsSize += buffer.length; + if (self._fieldsSize > self.maxFieldsSize) { + self._error(new Error('maxFieldsSize exceeded, received '+self._fieldsSize+' bytes of field data')); + return; + } + value += decoder.write(buffer); + }); + + part.on('end', function() { + self.emit('field', part.name, value); + }); + return; + } + + this._flushing++; + + var file = new File({ + path: this._uploadPath(part.filename), + name: part.filename, + type: part.mime, + hash: self.hash + }); + + this.emit('fileBegin', part.name, file); + + file.open(); + this.openedFiles.push(file); + + part.on('data', function(buffer) { + self._fileSize += buffer.length; + if (self._fileSize > self.maxFileSize) { + self._error(new Error('maxFileSize exceeded, received '+self._fileSize+' bytes of file data')); + return; + } + if (buffer.length == 0) { + return; + } + self.pause(); + file.write(buffer, function() { + self.resume(); + }); + }); + + part.on('end', function() { + file.end(function() { + self._flushing--; + self.emit('file', part.name, file); + self._maybeEnd(); + }); + }); +}; + +function dummyParser(self) { + return { + end: function () { + self.ended = true; + self._maybeEnd(); + return null; + } + }; +} + +IncomingForm.prototype._parseContentType = function() { + if (this.bytesExpected === 0) { + this._parser = dummyParser(this); + return; + } + + if (this.headers['transfer-encoding'] === 'chunked') { + this._initOctetStream(); + return; + } + + if (!this.headers['content-type']) { + this._error(new Error('bad content-type header, no content-type')); + return; + } + + if (this.headers['content-type'].match(/octet-stream/i)) { + this._initOctetStream(); + return; + } + + if (this.headers['content-type'].match(/urlencoded/i)) { + this._initUrlencoded(); + return; + } + + if (this.headers['content-type'].match(/multipart/i)) { + var m = this.headers['content-type'].match(/boundary=(?:"([^"]+)"|([^;]+))/i); + if (m) { + this._initMultipart(m[1] || m[2]); + } else { + this._error(new Error('bad content-type header, no multipart boundary')); + } + return; + } + + if (this.headers['content-type'].match(/json/i)) { + this._initJSONencoded(); + return; + } + + this._error(new Error('bad content-type header, unknown content-type: '+this.headers['content-type'])); +}; + +IncomingForm.prototype._error = function(err) { + if (this.error || this.ended) { + return; + } + + this.error = err; + this.emit('error', err); + + if (Array.isArray(this.openedFiles)) { + this.openedFiles.forEach(function(file) { + file._writeStream + .on('error', function() {}) + .destroy(); + setTimeout(fs.unlink, 0, file.path, function(error) { }); + }); + } +}; + +IncomingForm.prototype._parseContentLength = function() { + this.bytesReceived = 0; + if (this.headers['content-length']) { + this.bytesExpected = parseInt(this.headers['content-length'], 10); + } else if (this.headers['transfer-encoding'] === undefined) { + this.bytesExpected = 0; + } + + if (this.bytesExpected !== null) { + this.emit('progress', this.bytesReceived, this.bytesExpected); + } +}; + +IncomingForm.prototype._newParser = function() { + return new MultipartParser(); +}; + +IncomingForm.prototype._initMultipart = function(boundary) { + this.type = 'multipart'; + + var parser = new MultipartParser(), + self = this, + headerField, + headerValue, + part; + + parser.initWithBoundary(boundary); + + parser.onPartBegin = function() { + part = new Stream(); + part.readable = true; + part.headers = {}; + part.name = null; + part.filename = null; + part.mime = null; + + part.transferEncoding = 'binary'; + part.transferBuffer = ''; + + headerField = ''; + headerValue = ''; + }; + + parser.onHeaderField = function(b, start, end) { + headerField += b.toString(self.encoding, start, end); + }; + + parser.onHeaderValue = function(b, start, end) { + headerValue += b.toString(self.encoding, start, end); + }; + + parser.onHeaderEnd = function() { + headerField = headerField.toLowerCase(); + part.headers[headerField] = headerValue; + + // matches either a quoted-string or a token (RFC 2616 section 19.5.1) + var m = headerValue.match(/\bname=("([^"]*)"|([^\(\)<>@,;:\\"\/\[\]\?=\{\}\s\t/]+))/i); + if (headerField == 'content-disposition') { + if (m) { + part.name = m[2] || m[3] || ''; + } + + part.filename = self._fileName(headerValue); + } else if (headerField == 'content-type') { + part.mime = headerValue; + } else if (headerField == 'content-transfer-encoding') { + part.transferEncoding = headerValue.toLowerCase(); + } + + headerField = ''; + headerValue = ''; + }; + + parser.onHeadersEnd = function() { + switch(part.transferEncoding){ + case 'binary': + case '7bit': + case '8bit': + parser.onPartData = function(b, start, end) { + part.emit('data', b.slice(start, end)); + }; + + parser.onPartEnd = function() { + part.emit('end'); + }; + break; + + case 'base64': + parser.onPartData = function(b, start, end) { + part.transferBuffer += b.slice(start, end).toString('ascii'); + + /* + four bytes (chars) in base64 converts to three bytes in binary + encoding. So we should always work with a number of bytes that + can be divided by 4, it will result in a number of buytes that + can be divided vy 3. + */ + var offset = parseInt(part.transferBuffer.length / 4, 10) * 4; + part.emit('data', new Buffer(part.transferBuffer.substring(0, offset), 'base64')); + part.transferBuffer = part.transferBuffer.substring(offset); + }; + + parser.onPartEnd = function() { + part.emit('data', new Buffer(part.transferBuffer, 'base64')); + part.emit('end'); + }; + break; + + default: + return self._error(new Error('unknown transfer-encoding')); + } + + self.onPart(part); + }; + + + parser.onEnd = function() { + self.ended = true; + self._maybeEnd(); + }; + + this._parser = parser; +}; + +IncomingForm.prototype._fileName = function(headerValue) { + // matches either a quoted-string or a token (RFC 2616 section 19.5.1) + var m = headerValue.match(/\bfilename=("(.*?)"|([^\(\)<>@,;:\\"\/\[\]\?=\{\}\s\t/]+))($|;\s)/i); + if (!m) return; + + var match = m[2] || m[3] || ''; + var filename = match.substr(match.lastIndexOf('\\') + 1); + filename = filename.replace(/%22/g, '"'); + filename = filename.replace(/&#([\d]{4});/g, function(m, code) { + return String.fromCharCode(code); + }); + return filename; +}; + +IncomingForm.prototype._initUrlencoded = function() { + this.type = 'urlencoded'; + + var parser = new QuerystringParser(this.maxFields) + , self = this; + + parser.onField = function(key, val) { + self.emit('field', key, val); + }; + + parser.onEnd = function() { + self.ended = true; + self._maybeEnd(); + }; + + this._parser = parser; +}; + +IncomingForm.prototype._initOctetStream = function() { + this.type = 'octet-stream'; + var filename = this.headers['x-file-name']; + var mime = this.headers['content-type']; + + var file = new File({ + path: this._uploadPath(filename), + name: filename, + type: mime + }); + + this.emit('fileBegin', filename, file); + file.open(); + this.openedFiles.push(file); + this._flushing++; + + var self = this; + + self._parser = new OctetParser(); + + //Keep track of writes that haven't finished so we don't emit the file before it's done being written + var outstandingWrites = 0; + + self._parser.on('data', function(buffer){ + self.pause(); + outstandingWrites++; + + file.write(buffer, function() { + outstandingWrites--; + self.resume(); + + if(self.ended){ + self._parser.emit('doneWritingFile'); + } + }); + }); + + self._parser.on('end', function(){ + self._flushing--; + self.ended = true; + + var done = function(){ + file.end(function() { + self.emit('file', 'file', file); + self._maybeEnd(); + }); + }; + + if(outstandingWrites === 0){ + done(); + } else { + self._parser.once('doneWritingFile', done); + } + }); +}; + +IncomingForm.prototype._initJSONencoded = function() { + this.type = 'json'; + + var parser = new JSONParser(this) + , self = this; + + parser.onField = function(key, val) { + self.emit('field', key, val); + }; + + parser.onEnd = function() { + self.ended = true; + self._maybeEnd(); + }; + + this._parser = parser; +}; + +IncomingForm.prototype._uploadPath = function(filename) { + var buf = crypto.randomBytes(16); + var name = 'upload_' + buf.toString('hex'); + + if (this.keepExtensions) { + var ext = path.extname(filename); + ext = ext.replace(/(\.[a-z0-9]+).*/i, '$1'); + + name += ext; + } + + return path.join(this.uploadDir, name); +}; + +IncomingForm.prototype._maybeEnd = function() { + if (!this.ended || this._flushing || this.error) { + return; + } + + this.emit('end'); +}; diff --git a/api/formidable/index.js b/api/formidable/index.js new file mode 100644 index 0000000..7a6e3e1 --- /dev/null +++ b/api/formidable/index.js @@ -0,0 +1,3 @@ +var IncomingForm = require('./incoming_form').IncomingForm; +IncomingForm.IncomingForm = IncomingForm; +module.exports = IncomingForm; diff --git a/api/formidable/json_parser.js b/api/formidable/json_parser.js new file mode 100644 index 0000000..28a23ba --- /dev/null +++ b/api/formidable/json_parser.js @@ -0,0 +1,30 @@ +if (global.GENTLY) require = GENTLY.hijack(require); + +var Buffer = require('buffer').Buffer; + +function JSONParser(parent) { + this.parent = parent; + this.chunks = []; + this.bytesWritten = 0; +} +exports.JSONParser = JSONParser; + +JSONParser.prototype.write = function(buffer) { + this.bytesWritten += buffer.length; + this.chunks.push(buffer); + return buffer.length; +}; + +JSONParser.prototype.end = function() { + try { + var fields = JSON.parse(Buffer.concat(this.chunks)); + for (var field in fields) { + this.onField(field, fields[field]); + } + } catch (e) { + this.parent.emit('error', e); + } + this.data = null; + + this.onEnd(); +}; diff --git a/api/formidable/multipart_parser.js b/api/formidable/multipart_parser.js new file mode 100644 index 0000000..36de2b0 --- /dev/null +++ b/api/formidable/multipart_parser.js @@ -0,0 +1,332 @@ +var Buffer = require('buffer').Buffer, + s = 0, + S = + { PARSER_UNINITIALIZED: s++, + START: s++, + START_BOUNDARY: s++, + HEADER_FIELD_START: s++, + HEADER_FIELD: s++, + HEADER_VALUE_START: s++, + HEADER_VALUE: s++, + HEADER_VALUE_ALMOST_DONE: s++, + HEADERS_ALMOST_DONE: s++, + PART_DATA_START: s++, + PART_DATA: s++, + PART_END: s++, + END: s++ + }, + + f = 1, + F = + { PART_BOUNDARY: f, + LAST_BOUNDARY: f *= 2 + }, + + LF = 10, + CR = 13, + SPACE = 32, + HYPHEN = 45, + COLON = 58, + A = 97, + Z = 122, + + lower = function(c) { + return c | 0x20; + }; + +for (s in S) { + exports[s] = S[s]; +} + +function MultipartParser() { + this.boundary = null; + this.boundaryChars = null; + this.lookbehind = null; + this.state = S.PARSER_UNINITIALIZED; + + this.index = null; + this.flags = 0; +} +exports.MultipartParser = MultipartParser; + +MultipartParser.stateToString = function(stateNumber) { + for (var state in S) { + var number = S[state]; + if (number === stateNumber) return state; + } +}; + +MultipartParser.prototype.initWithBoundary = function(str) { + this.boundary = new Buffer(str.length+4); + this.boundary.write('\r\n--', 0); + this.boundary.write(str, 4); + this.lookbehind = new Buffer(this.boundary.length+8); + this.state = S.START; + + this.boundaryChars = {}; + for (var i = 0; i < this.boundary.length; i++) { + this.boundaryChars[this.boundary[i]] = true; + } +}; + +MultipartParser.prototype.write = function(buffer) { + var self = this, + i = 0, + len = buffer.length, + prevIndex = this.index, + index = this.index, + state = this.state, + flags = this.flags, + lookbehind = this.lookbehind, + boundary = this.boundary, + boundaryChars = this.boundaryChars, + boundaryLength = this.boundary.length, + boundaryEnd = boundaryLength - 1, + bufferLength = buffer.length, + c, + cl, + + mark = function(name) { + self[name+'Mark'] = i; + }, + clear = function(name) { + delete self[name+'Mark']; + }, + callback = function(name, buffer, start, end) { + if (start !== undefined && start === end) { + return; + } + + var callbackSymbol = 'on'+name.substr(0, 1).toUpperCase()+name.substr(1); + if (callbackSymbol in self) { + self[callbackSymbol](buffer, start, end); + } + }, + dataCallback = function(name, clear) { + var markSymbol = name+'Mark'; + if (!(markSymbol in self)) { + return; + } + + if (!clear) { + callback(name, buffer, self[markSymbol], buffer.length); + self[markSymbol] = 0; + } else { + callback(name, buffer, self[markSymbol], i); + delete self[markSymbol]; + } + }; + + for (i = 0; i < len; i++) { + c = buffer[i]; + switch (state) { + case S.PARSER_UNINITIALIZED: + return i; + case S.START: + index = 0; + state = S.START_BOUNDARY; + case S.START_BOUNDARY: + if (index == boundary.length - 2) { + if (c == HYPHEN) { + flags |= F.LAST_BOUNDARY; + } else if (c != CR) { + return i; + } + index++; + break; + } else if (index - 1 == boundary.length - 2) { + if (flags & F.LAST_BOUNDARY && c == HYPHEN){ + callback('end'); + state = S.END; + flags = 0; + } else if (!(flags & F.LAST_BOUNDARY) && c == LF) { + index = 0; + callback('partBegin'); + state = S.HEADER_FIELD_START; + } else { + return i; + } + break; + } + + if (c != boundary[index+2]) { + index = -2; + } + if (c == boundary[index+2]) { + index++; + } + break; + case S.HEADER_FIELD_START: + state = S.HEADER_FIELD; + mark('headerField'); + index = 0; + case S.HEADER_FIELD: + if (c == CR) { + clear('headerField'); + state = S.HEADERS_ALMOST_DONE; + break; + } + + index++; + if (c == HYPHEN) { + break; + } + + if (c == COLON) { + if (index == 1) { + // empty header field + return i; + } + dataCallback('headerField', true); + state = S.HEADER_VALUE_START; + break; + } + + cl = lower(c); + if (cl < A || cl > Z) { + return i; + } + break; + case S.HEADER_VALUE_START: + if (c == SPACE) { + break; + } + + mark('headerValue'); + state = S.HEADER_VALUE; + case S.HEADER_VALUE: + if (c == CR) { + dataCallback('headerValue', true); + callback('headerEnd'); + state = S.HEADER_VALUE_ALMOST_DONE; + } + break; + case S.HEADER_VALUE_ALMOST_DONE: + if (c != LF) { + return i; + } + state = S.HEADER_FIELD_START; + break; + case S.HEADERS_ALMOST_DONE: + if (c != LF) { + return i; + } + + callback('headersEnd'); + state = S.PART_DATA_START; + break; + case S.PART_DATA_START: + state = S.PART_DATA; + mark('partData'); + case S.PART_DATA: + prevIndex = index; + + if (index === 0) { + // boyer-moore derrived algorithm to safely skip non-boundary data + i += boundaryEnd; + while (i < bufferLength && !(buffer[i] in boundaryChars)) { + i += boundaryLength; + } + i -= boundaryEnd; + c = buffer[i]; + } + + if (index < boundary.length) { + if (boundary[index] == c) { + if (index === 0) { + dataCallback('partData', true); + } + index++; + } else { + index = 0; + } + } else if (index == boundary.length) { + index++; + if (c == CR) { + // CR = part boundary + flags |= F.PART_BOUNDARY; + } else if (c == HYPHEN) { + // HYPHEN = end boundary + flags |= F.LAST_BOUNDARY; + } else { + index = 0; + } + } else if (index - 1 == boundary.length) { + if (flags & F.PART_BOUNDARY) { + index = 0; + if (c == LF) { + // unset the PART_BOUNDARY flag + flags &= ~F.PART_BOUNDARY; + callback('partEnd'); + callback('partBegin'); + state = S.HEADER_FIELD_START; + break; + } + } else if (flags & F.LAST_BOUNDARY) { + if (c == HYPHEN) { + callback('partEnd'); + callback('end'); + state = S.END; + flags = 0; + } else { + index = 0; + } + } else { + index = 0; + } + } + + if (index > 0) { + // when matching a possible boundary, keep a lookbehind reference + // in case it turns out to be a false lead + lookbehind[index-1] = c; + } else if (prevIndex > 0) { + // if our boundary turned out to be rubbish, the captured lookbehind + // belongs to partData + callback('partData', lookbehind, 0, prevIndex); + prevIndex = 0; + mark('partData'); + + // reconsider the current character even so it interrupted the sequence + // it could be the beginning of a new sequence + i--; + } + + break; + case S.END: + break; + default: + return i; + } + } + + dataCallback('headerField'); + dataCallback('headerValue'); + dataCallback('partData'); + + this.index = index; + this.state = state; + this.flags = flags; + + return len; +}; + +MultipartParser.prototype.end = function() { + var callback = function(self, name) { + var callbackSymbol = 'on'+name.substr(0, 1).toUpperCase()+name.substr(1); + if (callbackSymbol in self) { + self[callbackSymbol](); + } + }; + if ((this.state == S.HEADER_FIELD_START && this.index === 0) || + (this.state == S.PART_DATA && this.index == this.boundary.length)) { + callback(this, 'partEnd'); + callback(this, 'end'); + } else if (this.state != S.END) { + return new Error('MultipartParser.end(): stream ended unexpectedly: ' + this.explain()); + } +}; + +MultipartParser.prototype.explain = function() { + return 'state = ' + MultipartParser.stateToString(this.state); +}; diff --git a/api/formidable/octet_parser.js b/api/formidable/octet_parser.js new file mode 100644 index 0000000..6e8b551 --- /dev/null +++ b/api/formidable/octet_parser.js @@ -0,0 +1,20 @@ +var EventEmitter = require('events').EventEmitter + , util = require('util'); + +function OctetParser(options){ + if(!(this instanceof OctetParser)) return new OctetParser(options); + EventEmitter.call(this); +} + +util.inherits(OctetParser, EventEmitter); + +exports.OctetParser = OctetParser; + +OctetParser.prototype.write = function(buffer) { + this.emit('data', buffer); + return buffer.length; +}; + +OctetParser.prototype.end = function() { + this.emit('end'); +}; diff --git a/api/formidable/querystring_parser.js b/api/formidable/querystring_parser.js new file mode 100644 index 0000000..fcaffe0 --- /dev/null +++ b/api/formidable/querystring_parser.js @@ -0,0 +1,27 @@ +if (global.GENTLY) require = GENTLY.hijack(require); + +// This is a buffering parser, not quite as nice as the multipart one. +// If I find time I'll rewrite this to be fully streaming as well +var querystring = require('querystring'); + +function QuerystringParser(maxKeys) { + this.maxKeys = maxKeys; + this.buffer = ''; +} +exports.QuerystringParser = QuerystringParser; + +QuerystringParser.prototype.write = function(buffer) { + this.buffer += buffer.toString('ascii'); + return buffer.length; +}; + +QuerystringParser.prototype.end = function() { + var fields = querystring.parse(this.buffer, '&', '=', { maxKeys: this.maxKeys }); + for (var field in fields) { + this.onField(field, fields[field]); + } + this.buffer = ''; + + this.onEnd(); +}; + diff --git a/api/ingest.mjs b/api/ingest.mjs new file mode 100644 index 0000000..f4e7f54 --- /dev/null +++ b/api/ingest.mjs @@ -0,0 +1,81 @@ +import path from 'path' +import formidable from './formidable/index.js' +import fs from 'fs/promises' +import { HttpError } from 'flaska' + +export default class IngestRoutes { + constructor(opts = {}) { + Object.assign(this, { + }) + } + + uploadPart(org, ctx) { + let form = formidable.IncomingForm() + form.uploadDir = org.uploadDir + form.maxFileSize = org.maxFileSize || 8 * 1024 * 1024 + form.maxFieldsSize = org.maxFieldsSize || 10 * 1024 + form.maxFields = org.maxFields || 50 + + let filename = ctx.params.path.split('/').slice(-1)[0] + let target = path.join(org.uploadDir, filename) + + return new Promise(function(res, rej) { + form.parse(ctx.req, function(err, fields, files) { + if (err) return rej(new HttpError(400, err.message)) + + ctx.req.body = fields + ctx.req.files = files + ctx.req.file = null + + if (!ctx.req.files) { + return res() + } + + let keys = Object.keys(files).filter(key => Boolean(ctx.req.files[key])) + + Promise.all( + keys.map(key => { + return fs.rm(target, { force: true }) + .then(() => { + return fs.rename(ctx.req.files[key].path, target) + }) + .then(function() { + ctx.req.files[key].path = target + ctx.req.files[key].filename = filename + }) + }) + ) + .then(() => { + if (keys.length === 1 && keys[0] === 'file') { + ctx.req.file = ctx.req.files.file + } + res() + }, rej) + }) + }) + } + + register(server) { + server.flaska.put('/ingest/::path', [ + this.uploadPart.bind(this, { + maxFileSize: 100 * 1024 * 1024, + uploadDir: './upload', + }), + ], this.ingest.bind(this)) + server.flaska.post('/ingest/::path', [ + this.uploadPart.bind(this, { + maxFileSize: 100 * 1024 * 1024, + uploadDir: './upload', + }), + ], this.ingest.bind(this)) + } + + /** POST: /api/ingest */ + async ingest(ctx) { + ctx.log.info(`Got part ${ctx.req.file.filename} size ${ctx.req.file.size}`) + + ctx.body = { + success: true, + } + } +} diff --git a/api/server.mjs b/api/server.mjs new file mode 100644 index 0000000..0250df5 --- /dev/null +++ b/api/server.mjs @@ -0,0 +1,96 @@ +import { Flaska, QueryHandler, JsonHandler, FormidableHandler } from 'flaska' +import StaticRoutes from './static_routes.mjs' +import IngestRoutes from './ingest.mjs' + +export default class Server { + constructor(http, port, core, opts = {}) { + Object.assign(this, opts) + this.http = http + this.port = port + this.core = core + this.pool = null + + this.flaskaOptions = { + log: this.core.log, + } + this.jsonHandler = JsonHandler + this.routes = { + static: new StaticRoutes(), + ingest: new IngestRoutes(), + } + } + + runCreateServer() { + // Create our server + this.flaska = new Flaska(this.flaskaOptions, this.http) + + // configure our server + if (process.env.NODE_ENV !== 'production') { + this.flaska.devMode() + } + + this.flaska.before(function(ctx) { + ctx.state.started = new Date().getTime() + ctx.req.ip = ctx.req.headers['x-forwarded-for'] || ctx.req.connection.remoteAddress + ctx.log = ctx.log.child({ + id: Math.random().toString(36).substring(2, 14), + }) + }.bind(this)) + this.flaska.before(QueryHandler()) + + let healthChecks = 0 + let healthCollectLimit = 60 * 60 * 12 + + this.flaska.after(function(ctx) { + let ended = new Date().getTime() + var requestTime = ended - ctx.state.started + + let status = '' + let level = 'info' + if (ctx.status >= 400) { + status = ctx.status + ' ' + level = 'warn' + } + if (ctx.status >= 500) { + level = 'error' + } + + if (ctx.url === '/health' || ctx.url === '/api/health') { + healthChecks++ + if (healthChecks >= healthCollectLimit) { + ctx.log[level]({ + duration: Math.round(ended), + status: ctx.status, + }, `<-- ${status}${ctx.method} ${ctx.url} {has happened ${healthChecks} times}`) + healthChecks = 0 + } + return + } + + ctx.log[level]({ + duration: requestTime, + status: ctx.status, + ip: ctx.req.ip, + }, (ctx.aborted ? '-->' : '<--') + ` ${status}${ctx.method} ${ctx.url}`) + }) + } + + runRegisterRoutes() { + let keys = Object.keys(this.routes) + for (let key of keys) { + this.routes[key].register(this) + } + } + + runStartListen() { + return this.flaska.listenAsync(this.port).then(() => { + this.core.log.info('Server is listening on port ' + this.port) + }) + } + + run() { + this.runCreateServer() + this.runRegisterRoutes() + return this.runStartListen() + } +} diff --git a/api/static_routes.mjs b/api/static_routes.mjs new file mode 100644 index 0000000..0c730d5 --- /dev/null +++ b/api/static_routes.mjs @@ -0,0 +1,16 @@ +export default class StaticRoutes { + constructor(opts = {}) { + Object.assign(this, { }) + } + + register(server) { + server.flaska.get('/api/health', this.health.bind(this)) + server.flaska.get('/', this.health.bind(this)) + } + + health(ctx) { + ctx.body = { + environment: ctx.config, + } + } +} diff --git a/index.mjs b/index.mjs new file mode 100644 index 0000000..68c6c5c --- /dev/null +++ b/index.mjs @@ -0,0 +1,35 @@ +import fs from 'fs' +import { pathToFileURL } from 'url' + +export function start(http, port, ctx) { + return import('./api/server.mjs') + .then(function(module) { + let server = new module.default(http, port, ctx) + return server.run() + }) +} + +if (import.meta.url === pathToFileURL(process.argv[1]).href) { + import('service-core').then(core => { + const port = 4510 + + var core = new core.ServiceCore('distributer', import.meta.url, port, '') + + let config = { + frontend: { + url: 'http://localhost:' + port + } + } + + try { + config = JSON.parse(fs.readFileSync('./config.json')) + } catch {} + + config.port = port + + core.setConfig(config) + core.init({ start }).then(function() { + return core.run() + }) + }) +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..bb6fb67 --- /dev/null +++ b/package.json @@ -0,0 +1,34 @@ +{ + "name": "filadelfia_distributer", + "version": "1.0.0", + "description": "Distributer for filadelfia streamer.", + "main": "index.mjs", + "scripts": { + "test": "eltro", + "dev:server": "node index.mjs | bunyan", + "dev": "npm-watch dev:server" + }, + "watch": { + "dev:server": { + "patterns": [ + "api/*" + ], + "extensions": "js,mjs", + "quiet": true, + "inherit": true + } + }, + "repository": { + "type": "git", + "url": "https://git.nfp.is/TheThing/filadelfia_distributer.git" + }, + "author": "Jonatan Nilsson", + "license": "WTFPL", + "dependencies": { + "flaska": "^1.3.4" + }, + "devDependencies": { + "eltro": "^1.4.4", + "service-core": "^3.0.1" + } +} diff --git a/upload/.gitkeep b/upload/.gitkeep new file mode 100644 index 0000000..e69de29