service-core/core/application.mjs

498 lines
17 KiB
JavaScript

import { EventEmitter } from 'events'
import cluster from 'cluster'
import fs from 'fs/promises'
import { request } from './client.mjs'
import HttpServer from './http.mjs'
import { defaults } from './defaults.mjs'
import Util from './util.mjs'
import bunyan from 'bunyan-lite'
import getLog from './log.mjs'
export default class Application extends EventEmitter {
constructor(ctx, provider, name, opts = {}) {
super()
this.ctx = {
db: ctx.db,
util: ctx.util,
log: ctx.log,
core: ctx.core,
app: this,
sc: {
Util: Util,
bunyan: bunyan,
getLog: getLog,
HttpServer: HttpServer,
request: request,
},
version: '',
}
this.config = defaults({}, this.ctx.db.config[name])
this.provider = provider
this.name = name
this.updating = false
this.http = new HttpServer(this.config)
this.module = null
this.workers = {}
// Fresh is used to indicate that when we run the application and it fails,
// whether the environment we are in was fresh or not. An example would be
// if we had previously run an older version. In that case, that older version
// might have dirtied the runtime or left a port open or other stuff.
// In which case, running the new version might fail even though it should
// normally be fine. As such we have this flag here. to indicate we might
// need a full restart before making another attempt.
this.fresh = true
// Apply defaults to config
this.config.updateEvery = this.config.updateEvery != null ? this.config.updateEvery : 180
this.config.startWaitUntilFail = this.config.startWaitUntilFail || (60 * 1000)
this.config.heartbeatTimeout = this.config.heartbeatTimeout || (3 * 1000)
this.config.heartbeatAttempts = this.config.heartbeatAttempts || 5
this.config.heartbeatAttemptsWait = this.config.heartbeatAttemptsWait || (2 * 1000)
this.config.heartbeatPath = this.config.heartbeatPath || '/'
this.config.clusterWaitOnCrash = this.config.clusterWaitOnCrash || (1 * 1000)
this.ctx.config = this.config
Object.assign(this, {
setInterval: opts.setInterval || setInterval,
setTimeout: opts.setTimeout || setTimeout,
fs: opts.fs || fs,
cluster: opts.cluster || cluster
})
this.isSlave = this.cluster.isWorker
this.monitoringCluster = false
if (this.config.cluster && !this.isSlave) {
for (let i = 1; i <= this.config.cluster; i++) {
this.workers[i] = null
}
}
this.ctx.db.addApplication(name)
}
startAutoupdater() {
if (this.provider.static) return
if (this.config.updateEvery === 0) return
if (this.isSlave) {
this.ctx.log.warn('app.startAutoupdater was called from within a slave')
return
}
let timer = this.setInterval(() => {
this.update().then(
() => {
this.ctx.db.data.core[this.name].updater += 'Automatic update finished successfully. '
},
(err) => {
this.ctx.db.data.core[this.name].updater += 'Error while running automatic update: ' + err.message + '. '
}
)
}, this.config.updateEvery * 60 * 1000)
timer.unref()
}
updateLog(message, level = 'info') {
this.ctx.db.data.core[this.name].updater += message
this.ctx.log[level](message)
return message
}
msgStatic = 'Provider in question is static and so no update required, nothing to do.'
update() {
if (this.isSlave) {
this.ctx.log.warn('app.update was called from within a slave')
return null
}
if (this.provider.static) {
if (this.ctx.db.data.core[this.name].updater !== this.msgStatic) {
this.ctx.db.data.core[this.name].updater = ''
this.updateLog(this.msgStatic)
this.emit('updatelog', this.msgStatic)
return this.ctx.db.write().then(function() { return null })
}
return Promise.resolve(null)
}
if (this.updating) return null
this.updating = true
this.emit('updating', this.updating)
return this._update()
.then((result) => {
this.updating = false
this.emit('updating', this.updating)
return this.ctx.db.write()
.then(function() { return result })
})
.then((result) => {
if (result) {
this.emit('updated', result)
}
return result
})
.catch((err) => {
this.updating = false
return this.ctx.db.write()
.then(function() { return Promise.reject(err) })
})
}
logAddSeperator(log) {
if (!log.endsWith('\n')) {
log += '\n'
}
if (!log.endsWith('\n\n')) {
log += '\n'
}
return log
}
async _update() {
this.ctx.db.data.core[this.name].updater = ''
let cleanup = true
let folder = ''
let log = ''
let latest = null
try {
log += this.updateLog(`Checking for latest version at ${new Date().toISOString().replace('T', ' ').split('.')[0]}. `, 'debug') + '\n'
this.emit('updatelog', log)
// Get the latest version from our provider
latest = await this.provider.getLatestVersion()
// If the versino matches the latest installed, then there's nothing to do
if (this.ctx.db.data.core[this.name].latestInstalled === latest.version) {
log += this.updateLog(`Found ${latest.version}. `, 'debug') + '\n'
log += this.updateLog('Already up to date, nothing to do. ', 'debug')
this.emit('updatelog', log)
return null
}
log += this.updateLog(`Found new update ${latest.version}. `) + '\n'
// Make the id for the vesion the version number. Allows for easy lookup
// among other nice and simple structure.
latest.id = latest.version
// check to see if we already have this version in our database.
var found = this.ctx.db.get(this.ctx.db.data.core[this.name].versions, latest.id)
if (found) {
// Check if the existing version found was already installed.
if (found.installed) {
log += this.updateLog('Version was already installed, nothing to do. ')
this.emit('updatelog', log)
return null
}
// We found existing on, update the keys of the one in the databse with
// the latest data we got from getLatestVersion(). This ensures that info
// like link, filename and such get updated if these have changed since.
Object.keys(latest).forEach(function(key) {
found[key] = latest[key]
})
latest = found
// Check to see if the existing one has failed too many times and
// if so, we should skip them and consider those versions as black
// listed and avoid at all cost.
if (latest.failtodownload && latest.failtodownload > 3) {
log += this.updateLog('Version failed to download too many times, skipping this version. ')
this.emit('updatelog', log)
return null
}
if (latest.failtoinstall && latest.failtoinstall > 3) {
log += this.updateLog('Version failed to install too many times, skipping this version. ')
this.emit('updatelog', log)
return null
}
// Combine the logs
log = latest.log + log
} else {
// This is a new version, mark it with stable tag of zero.
latest.stable = 0
this.ctx.db.upsertFirst(this.ctx.db.data.core[this.name].versions, latest)
}
this.emit('updatelog', log)
this.emit('update', latest)
// The target file for the archive and the target folder for new our version
let target = this.ctx.util.getPathFromRoot(`./${this.name}/${latest.version}/file${this.ctx.util.getExtension(latest.filename)}`)
folder = this.ctx.util.getPathFromRoot(`./${this.name}/${latest.version}`)
// Create it in case it does not exist.
await this.fs.mkdir(folder, { recursive: true })
log += this.updateLog(`Downloading ${latest.link} to ${target}. `) + '\n'
this.emit('updatelog', log)
await this.ctx.db.write()
// Download the latest version using the provider in question.
await this.provider.downloadVersion(latest, target)
.catch(function(err) {
latest.failtodownload = (latest.failtodownload || 0) + 1
return Promise.reject(err)
})
log += '\n' + this.updateLog(`Extracting ${target}. `) + '\n'
this.emit('updatelog', log)
await this.ctx.db.write()
// Download was successful, extract the archived file that we downloaded
await this.ctx.util.extractFile(target, (msg) => {
log += msg
this.emit('updatelog', log)
}).catch(function(err) {
latest.failtodownload = (latest.failtodownload || 0) + 1
return Promise.reject(err)
})
// Remove the archived file since we're done using it.
await this.fs.rm(target, { force: true }).catch(function() {})
// The extracting process might not leave enough newlines for our
// desired clean output for our logs so add them.
log = this.logAddSeperator(log)
// check if the version we downloaded had index.mjs. If this is
// missing then either the extracting or download failed without erroring
// or the archived is borked.
await this.fs.stat(this.ctx.util.getPathFromRoot(`./${this.name}/${latest.version}/index.mjs`))
.catch((err) => {
latest.failtodownload = (latest.failtodownload || 0) + 1
log += this.updateLog('Version did not include or was missing index.mjs. ') + '\n'
this.emit('updatelog', log)
return Promise.reject(err)
})
// If we reach here, then we don't wanna cleanup or remove existing files
// in case more errors occured. The download was success and preliminary
// checks indicate the version is valid. As such, we are gonna skip
// clearing the folder even if something occurs later on.
cleanup = false
// Check if we have a package.json file. If we do, we need to run
// npm install. If we don't then this application either has all the
// required packages or it doesn't need them to run
let packageStat = await this.fs.stat(this.ctx.util.getPathFromRoot(`./${this.name}/${latest.version}/package.json`))
.catch(function() { return null })
if (packageStat) {
log += this.updateLog(`running npm install --production. `) + '\n'
this.emit('updatelog', log)
await this.ctx.db.write()
// For some weird reason, --loglevel=notice is required otherwise
// we get practically zero log output.
await this.ctx.util.runCommand(
this.ctx.util.getNpmExecutable(),
['install', '--production', '--no-optional', '--no-package-lock', '--no-audit', '--loglevel=notice'],
folder,
(msg) => {
log += msg
this.emit('updatelog', log)
}
).catch(function(err) {
latest.failtoinstall = (latest.failtoinstall || 0) + 1
return Promise.reject(err)
})
log = this.logAddSeperator(log)
this.emit('updatelog', log)
} else {
log += this.updateLog('Release did not contain package.json, skipping npm install. ') + '\n'
this.emit('updatelog', log)
}
} catch (err) {
log += this.updateLog(`Error: ${err.message}. `) + '\n'
// Check if we have a folder and we need to do some cleanups. We do
// this if the download process failed so we can have a fresh clean
// tree for the next time update is run
if (folder && cleanup) {
await this.fs.rm(folder, { force: true, recursive: true }).catch((err) => {
this.updateLog(`Error while cleaning up: ${err.message}. `)
})
}
if (latest) {
latest.log = log
}
this.emit('updatelog', log)
return Promise.reject(err)
}
// If we reached here then everything went swimmingly. Mark the version
// as being installed and attach the install log to it.
log += this.updateLog(`Finished updating ${this.name} to version ${latest.version}.`) + '\n'
this.emit('updatelog', log)
this.ctx.db.data.core[this.name].latestInstalled = latest.version
latest.installed = true
latest.log = log
return latest
}
registerModule(module, version = '') {
if (module && typeof(module) === 'function') {
return this.registerModule({ start: module })
}
if (!module || typeof(module) !== 'object' || typeof(module.start) !== 'function') {
throw new Error(`Application ${this.name}${version ? ' version ' + version : '' } registerModule was called with a non module missing start function`)
}
this.module = module
}
workerDied(worker) {
if (this.workers[worker.w_id] !== worker) {
return
}
this.workers[worker.w_id] = null
this.setTimeout(() => {
if (this.workers[worker.w_id]) return
this.startForkProcess(worker.w_id, this.ctx.db.data.core[this.name].active)
}, this.clusterWaitOnCrash)
}
startForkProcess(i, version) {
this.workers[i] = this.cluster.fork({
CLUSTER_APP_NAME: this.name,
CLUSTER_APP_VERSION: version,
CLUSTER_APP_INDEX: i,
})
let pid = this.workers[i].process.pid
this.ctx.log.info(`Creating worker ${i} (${pid})`)
// this.workers[i].once('listening', () => {
// this.ctx.log.info(`Fork ${i} is alive and listening`)
// })
this.workers[i].once('exit', (code, signal) => {
if (signal) {
// this.ctx.log.info(`Worker ${i} (${pid}) was killed by signal ${signal}`)
} else if (code !== 0) {
this.ctx.log.warn(`Worker ${i} (${pid}) exited with code ${code}`)
}
})
this.workers[i].w_id = i
this.workers[i].listening = false
this.workers[i].started = new Date()
}
runVersion(version) {
this.ctx.db.data.core[this.name].active = version
this.ctx.version = version
this.emit('running', this.ctx.version)
return this.ctx.db.write().then(() => {
return this._runVersion(version)
.catch((err) => {
this.ctx.version = ''
this.emit('running', this.ctx.version)
return Promise.reject(err)
})
})
}
async _runVersion(version) {
this.ctx.db.data.core[this.name].active = version
await this.ctx.db.write()
if (this.config.cluster && !this.isSlave) {
for (let i = 1; i <= this.config.cluster; i++) {
this.startForkProcess(i, version)
}
} else {
if (version !== 'static') {
let indexPath = this.ctx.util.getPathFromRoot(`./${this.name}/${version}/index.mjs`)
await this.fs.stat(indexPath).catch((err) => {
return Promise.reject(new Error(`Version was missing index.mjs: ${err.message}`))
})
this.fresh = false
let module = await import(this.ctx.util.getUrlFromRoot(`./${this.name}/${version}/index.mjs`))
this.registerModule(module, version)
} else {
this.fresh = false
}
let errTimeout = new Error(`Version timed out (took over ${this.config.startWaitUntilFail}ms) while running start()`)
await new Promise((res, rej) => {
setTimeout(() => {
rej(errTimeout)
}, this.config.startWaitUntilFail)
let startRes = this.module.start(this.http, this.config.port || null, this.ctx)
if (startRes && startRes.then) {
return startRes.then(res, rej)
}
res()
})
if (!this.http.active) {
return Promise.reject(new Error(`Version did not call http.createServer()`))
}
}
if (this.isSlave) {
return
}
let lastErr = null
for (let i = 0; i < this.config.heartbeatAttempts; i++) {
try {
await request({ timeout: this.config.heartbeatTimeout }, `http://localhost:${this.config.port}` + this.config.heartbeatPath, null, 0, true)
lastErr = null
break
} catch (err) {
lastErr = err
await new Promise((res) => {
setTimeout(res, this.config.heartbeatAttemptsWait)
})
// this.config.heartbeatAttemptsWait
}
}
if (lastErr) {
let err = new Error(`Version failed to start properly: ${lastErr.message}`)
err.stack = lastErr.stack
return Promise.reject(err)
}
if (this.config.cluster && !this.isSlave) {
this.__clusterWorkerDied = this.workerDied.bind(this)
this.cluster.on('exit', this.__clusterWorkerDied)
}
}
closeServer() {
this.ctx.version = ''
this.emit('running', this.ctx.version)
if (this.config.cluster && !this.isSlave) {
if (this.__clusterWorkerDied) {
this.cluster.off('exit', this.__clusterWorkerDied)
}
this.__clusterWorkerDied = null
for (let i = 1; i <= this.config.cluster; i++) {
if (this.workers[i]) {
this.workers[i].process.kill()
this.workers[i] = null
}
}
}
return this.http.closeServer()
}
}