import { EventEmitter } from 'events' import cluster from 'cluster' import fs from 'fs/promises' import { request } from './client.mjs' import HttpServer from './http.mjs' import { defaults } from './defaults.mjs' import Util from './util.mjs' import bunyan from 'bunyan-lite' import getLog from './log.mjs' export default class Application extends EventEmitter { constructor(ctx, provider, name, opts = {}) { super() this.ctx = { db: ctx.db, util: ctx.util, log: ctx.log, core: ctx.core, app: this, sc: { Util: Util, bunyan: bunyan, getLog: getLog, HttpServer: HttpServer, request: request, }, version: '', } this.config = defaults({}, this.ctx.db.config[name]) this.provider = provider this.name = name this.updating = false this.http = new HttpServer(this.config) this.module = null this.workers = {} // Fresh is used to indicate that when we run the application and it fails, // whether the environment we are in was fresh or not. An example would be // if we had previously run an older version. In that case, that older version // might have dirtied the runtime or left a port open or other stuff. // In which case, running the new version might fail even though it should // normally be fine. As such we have this flag here. to indicate we might // need a full restart before making another attempt. this.fresh = true // Apply defaults to config this.config.updateEvery = this.config.updateEvery != null ? this.config.updateEvery : 180 this.config.startWaitUntilFail = this.config.startWaitUntilFail || (60 * 1000) this.config.heartbeatTimeout = this.config.heartbeatTimeout || (3 * 1000) this.config.heartbeatAttempts = this.config.heartbeatAttempts || 5 this.config.heartbeatAttemptsWait = this.config.heartbeatAttemptsWait || (2 * 1000) this.config.heartbeatPath = this.config.heartbeatPath || '/' this.config.clusterWaitOnCrash = this.config.clusterWaitOnCrash || (1 * 1000) this.ctx.config = this.config Object.assign(this, { setInterval: opts.setInterval || setInterval, setTimeout: opts.setTimeout || setTimeout, fs: opts.fs || fs, cluster: opts.cluster || cluster }) this.isSlave = this.cluster.isWorker this.monitoringCluster = false if (this.config.cluster && !this.isSlave) { for (let i = 1; i <= this.config.cluster; i++) { this.workers[i] = null } } this.ctx.db.addApplication(name) } startAutoupdater() { if (this.provider.static) return if (this.config.updateEvery === 0) return if (this.isSlave) { this.ctx.log.warn('app.startAutoupdater was called from within a slave') return } let timer = this.setInterval(() => { this.update().then( () => { this.ctx.db.data.core[this.name].updater += 'Automatic update finished successfully. ' }, (err) => { this.ctx.db.data.core[this.name].updater += 'Error while running automatic update: ' + err.message + '. ' } ) }, this.config.updateEvery * 60 * 1000) timer.unref() } updateLog(message, level = 'info') { this.ctx.db.data.core[this.name].updater += message this.ctx.log[level](message) return message } msgStatic = 'Provider in question is static and so no update required, nothing to do.' update() { if (this.isSlave) { this.ctx.log.warn('app.update was called from within a slave') return null } if (this.provider.static) { if (this.ctx.db.data.core[this.name].updater !== this.msgStatic) { this.ctx.db.data.core[this.name].updater = '' this.updateLog(this.msgStatic) this.emit('updatelog', this.msgStatic) return this.ctx.db.write().then(function() { return null }) } return Promise.resolve(null) } if (this.updating) return null this.updating = true this.emit('updating', this.updating) return this._update() .then((result) => { this.updating = false this.emit('updating', this.updating) return this.ctx.db.write() .then(function() { return result }) }) .then((result) => { if (result) { this.emit('updated', result) } return result }) .catch((err) => { this.updating = false return this.ctx.db.write() .then(function() { return Promise.reject(err) }) }) } logAddSeperator(log) { if (!log.endsWith('\n')) { log += '\n' } if (!log.endsWith('\n\n')) { log += '\n' } return log } async _update() { this.ctx.db.data.core[this.name].updater = '' let cleanup = true let folder = '' let log = '' let latest = null try { log += this.updateLog(`Checking for latest version at ${new Date().toISOString().replace('T', ' ').split('.')[0]}. `, 'debug') + '\n' this.emit('updatelog', log) // Get the latest version from our provider latest = await this.provider.getLatestVersion() // If the versino matches the latest installed, then there's nothing to do if (this.ctx.db.data.core[this.name].latestInstalled === latest.version) { log += this.updateLog(`Found ${latest.version}. `, 'debug') + '\n' log += this.updateLog('Already up to date, nothing to do. ', 'debug') this.emit('updatelog', log) return null } log += this.updateLog(`Found new update ${latest.version}. `) + '\n' // Make the id for the vesion the version number. Allows for easy lookup // among other nice and simple structure. latest.id = latest.version // check to see if we already have this version in our database. var found = this.ctx.db.get(this.ctx.db.data.core[this.name].versions, latest.id) if (found) { // Check if the existing version found was already installed. if (found.installed) { log += this.updateLog('Version was already installed, nothing to do. ') this.emit('updatelog', log) return null } // We found existing on, update the keys of the one in the databse with // the latest data we got from getLatestVersion(). This ensures that info // like link, filename and such get updated if these have changed since. Object.keys(latest).forEach(function(key) { found[key] = latest[key] }) latest = found // Check to see if the existing one has failed too many times and // if so, we should skip them and consider those versions as black // listed and avoid at all cost. if (latest.failtodownload && latest.failtodownload > 3) { log += this.updateLog('Version failed to download too many times, skipping this version. ') this.emit('updatelog', log) return null } if (latest.failtoinstall && latest.failtoinstall > 3) { log += this.updateLog('Version failed to install too many times, skipping this version. ') this.emit('updatelog', log) return null } // Combine the logs log = latest.log + log } else { // This is a new version, mark it with stable tag of zero. latest.stable = 0 this.ctx.db.upsertFirst(this.ctx.db.data.core[this.name].versions, latest) } this.emit('updatelog', log) this.emit('update', latest) // The target file for the archive and the target folder for new our version let target = this.ctx.util.getPathFromRoot(`./${this.name}/${latest.version}/file${this.ctx.util.getExtension(latest.filename)}`) folder = this.ctx.util.getPathFromRoot(`./${this.name}/${latest.version}`) // Create it in case it does not exist. await this.fs.mkdir(folder, { recursive: true }) log += this.updateLog(`Downloading ${latest.link} to ${target}. `) + '\n' this.emit('updatelog', log) await this.ctx.db.write() // Download the latest version using the provider in question. await this.provider.downloadVersion(latest, target) .catch(function(err) { latest.failtodownload = (latest.failtodownload || 0) + 1 return Promise.reject(err) }) log += '\n' + this.updateLog(`Extracting ${target}. `) + '\n' this.emit('updatelog', log) await this.ctx.db.write() // Download was successful, extract the archived file that we downloaded await this.ctx.util.extractFile(target, (msg) => { log += msg this.emit('updatelog', log) }).catch(function(err) { latest.failtodownload = (latest.failtodownload || 0) + 1 return Promise.reject(err) }) // Remove the archived file since we're done using it. await this.fs.rm(target, { force: true }).catch(function() {}) // The extracting process might not leave enough newlines for our // desired clean output for our logs so add them. log = this.logAddSeperator(log) // check if the version we downloaded had index.mjs. If this is // missing then either the extracting or download failed without erroring // or the archived is borked. await this.fs.stat(this.ctx.util.getPathFromRoot(`./${this.name}/${latest.version}/index.mjs`)) .catch((err) => { latest.failtodownload = (latest.failtodownload || 0) + 1 log += this.updateLog('Version did not include or was missing index.mjs. ') + '\n' this.emit('updatelog', log) return Promise.reject(err) }) // If we reach here, then we don't wanna cleanup or remove existing files // in case more errors occured. The download was success and preliminary // checks indicate the version is valid. As such, we are gonna skip // clearing the folder even if something occurs later on. cleanup = false // Check if we have a package.json file. If we do, we need to run // npm install. If we don't then this application either has all the // required packages or it doesn't need them to run let packageStat = await this.fs.stat(this.ctx.util.getPathFromRoot(`./${this.name}/${latest.version}/package.json`)) .catch(function() { return null }) if (packageStat) { log += this.updateLog(`running npm install --production. `) + '\n' this.emit('updatelog', log) await this.ctx.db.write() // For some weird reason, --loglevel=notice is required otherwise // we get practically zero log output. await this.ctx.util.runCommand( this.ctx.util.getNpmExecutable(), ['install', '--production', '--no-optional', '--no-package-lock', '--no-audit', '--loglevel=notice'], folder, (msg) => { log += msg this.emit('updatelog', log) } ).catch(function(err) { latest.failtoinstall = (latest.failtoinstall || 0) + 1 return Promise.reject(err) }) log = this.logAddSeperator(log) this.emit('updatelog', log) } else { log += this.updateLog('Release did not contain package.json, skipping npm install. ') + '\n' this.emit('updatelog', log) } } catch (err) { log += this.updateLog(`Error: ${err.message}. `) + '\n' // Check if we have a folder and we need to do some cleanups. We do // this if the download process failed so we can have a fresh clean // tree for the next time update is run if (folder && cleanup) { await this.fs.rm(folder, { force: true, recursive: true }).catch((err) => { this.updateLog(`Error while cleaning up: ${err.message}. `) }) } if (latest) { latest.log = log } this.emit('updatelog', log) return Promise.reject(err) } // If we reached here then everything went swimmingly. Mark the version // as being installed and attach the install log to it. log += this.updateLog(`Finished updating ${this.name} to version ${latest.version}.`) + '\n' this.emit('updatelog', log) this.ctx.db.data.core[this.name].latestInstalled = latest.version latest.installed = true latest.log = log return latest } registerModule(module, version = '') { if (module && typeof(module) === 'function') { return this.registerModule({ start: module }) } if (!module || typeof(module) !== 'object' || typeof(module.start) !== 'function') { throw new Error(`Application ${this.name}${version ? ' version ' + version : '' } registerModule was called with a non module missing start function`) } this.module = module } workerDied(worker) { if (this.workers[worker.w_id] !== worker) { return } this.workers[worker.w_id] = null this.setTimeout(() => { if (this.workers[worker.w_id]) return this.startForkProcess(worker.w_id, this.ctx.db.data.core[this.name].active) }, this.clusterWaitOnCrash) } startForkProcess(i, version) { this.workers[i] = this.cluster.fork({ CLUSTER_APP_NAME: this.name, CLUSTER_APP_VERSION: version, CLUSTER_APP_INDEX: i, }) let pid = this.workers[i].process.pid this.ctx.log.info(`Creating worker ${i} (${pid})`) // this.workers[i].once('listening', () => { // this.ctx.log.info(`Fork ${i} is alive and listening`) // }) this.workers[i].once('exit', (code, signal) => { if (signal) { // this.ctx.log.info(`Worker ${i} (${pid}) was killed by signal ${signal}`) } else if (code !== 0) { this.ctx.log.warn(`Worker ${i} (${pid}) exited with code ${code}`) } }) this.workers[i].w_id = i this.workers[i].listening = false this.workers[i].started = new Date() } runVersion(version) { this.ctx.db.data.core[this.name].active = version this.ctx.version = version this.emit('running', this.ctx.version) return this.ctx.db.write().then(() => { return this._runVersion(version) .catch((err) => { this.ctx.version = '' this.emit('running', this.ctx.version) return Promise.reject(err) }) }) } async _runVersion(version) { this.ctx.db.data.core[this.name].active = version await this.ctx.db.write() if (this.config.cluster && !this.isSlave) { for (let i = 1; i <= this.config.cluster; i++) { this.startForkProcess(i, version) } } else { if (version !== 'static') { let indexPath = this.ctx.util.getPathFromRoot(`./${this.name}/${version}/index.mjs`) await this.fs.stat(indexPath).catch((err) => { return Promise.reject(new Error(`Version was missing index.mjs: ${err.message}`)) }) this.fresh = false let module = await import(this.ctx.util.getUrlFromRoot(`./${this.name}/${version}/index.mjs`)) this.registerModule(module, version) } else { this.fresh = false } let errTimeout = new Error(`Version timed out (took over ${this.config.startWaitUntilFail}ms) while running start()`) await new Promise((res, rej) => { setTimeout(() => { rej(errTimeout) }, this.config.startWaitUntilFail) let startRes = this.module.start(this.http, this.config.port || null, this.ctx) if (startRes && startRes.then) { return startRes.then(res, rej) } res() }) if (!this.http.active) { return Promise.reject(new Error(`Version did not call http.createServer()`)) } } if (this.isSlave) { return } let lastErr = null for (let i = 0; i < this.config.heartbeatAttempts; i++) { try { await request({ timeout: this.config.heartbeatTimeout }, `http://localhost:${this.config.port}` + this.config.heartbeatPath, null, 0, true) lastErr = null break } catch (err) { lastErr = err await new Promise((res) => { setTimeout(res, this.config.heartbeatAttemptsWait) }) // this.config.heartbeatAttemptsWait } } if (lastErr) { let err = new Error(`Version failed to start properly: ${lastErr.message}`) err.stack = lastErr.stack return Promise.reject(err) } if (this.config.cluster && !this.isSlave) { this.__clusterWorkerDied = this.workerDied.bind(this) this.cluster.on('exit', this.__clusterWorkerDied) } } closeServer() { this.ctx.version = '' this.emit('running', this.ctx.version) if (this.config.cluster && !this.isSlave) { if (this.__clusterWorkerDied) { this.cluster.off('exit', this.__clusterWorkerDied) } this.__clusterWorkerDied = null for (let i = 1; i <= this.config.cluster; i++) { if (this.workers[i]) { this.workers[i].process.kill() this.workers[i] = null } } } return this.http.closeServer() } }