Finished implementing cluster support

This commit is contained in:
Jonatan Nilsson 2022-03-10 11:06:17 +00:00
parent 47344c5e7a
commit e0da74f0be
19 changed files with 1382 additions and 499 deletions

View file

@ -29,8 +29,8 @@ test_script:
ln -s /lib/libc.musl-x86_64.so.1 /lib64/ld-linux-x86-64.so.2 ln -s /lib/libc.musl-x86_64.so.1 /lib64/ld-linux-x86-64.so.2
chmod -R 777 /appveyor/projects chmod -R 777 /appveyor/projects
npm install npm install
npm test npm run test -- --ignore-only
npm run test:integration npm run test:integration -- --ignore-only
artifacts: artifacts:
- path: ./*_sc-core.7z - path: ./*_sc-core.7z
@ -38,7 +38,9 @@ artifacts:
# on successful build # on successful build
on_success: on_success:
- sh: | - sh: |
apk add curl jq if [ ! apk add curl jq ]; then
exit 1
fi
CURR_VER=$(cat package.json | jq -r .version) CURR_VER=$(cat package.json | jq -r .version)
if curl -s -X GET -H "Authorization: token $deploytoken" https://git.nfp.is/api/v1/repos/$APPVEYOR_REPO_NAME/releases | grep -o "\"name\"\:\"v${CURR_VER}\"" > /dev/null; then if curl -s -X GET -H "Authorization: token $deploytoken" https://git.nfp.is/api/v1/repos/$APPVEYOR_REPO_NAME/releases | grep -o "\"name\"\:\"v${CURR_VER}\"" > /dev/null; then
echo "Release already exists, nothing to do."; echo "Release already exists, nothing to do.";

View file

@ -1,4 +1,6 @@
import { EventEmitter } from 'events' import { EventEmitter } from 'events'
import cluster from 'cluster'
import fs from 'fs/promises' import fs from 'fs/promises'
import { request } from './client.mjs' import { request } from './client.mjs'
import HttpServer from './http.mjs' import HttpServer from './http.mjs'
@ -21,6 +23,7 @@ export default class Application extends EventEmitter {
this.http = new HttpServer(this.config) this.http = new HttpServer(this.config)
this.module = null this.module = null
this.running = false this.running = false
this.workers = {}
// Fresh is used to indicate that when we run the application and it fails, // Fresh is used to indicate that when we run the application and it fails,
// whether the environment we are in was fresh or not. An example would be // whether the environment we are in was fresh or not. An example would be
@ -38,18 +41,34 @@ export default class Application extends EventEmitter {
this.config.heartbeatAttempts = this.config.heartbeatAttempts || 5 this.config.heartbeatAttempts = this.config.heartbeatAttempts || 5
this.config.heartbeatAttemptsWait = this.config.heartbeatAttemptsWait || (2 * 1000) this.config.heartbeatAttemptsWait = this.config.heartbeatAttemptsWait || (2 * 1000)
this.config.heartbeatPath = this.config.heartbeatPath || '/' this.config.heartbeatPath = this.config.heartbeatPath || '/'
this.config.clusterWaitOnCrash = this.config.clusterWaitOnCrash || (1 * 1000)
Object.assign(this, { Object.assign(this, {
setInterval: opts.setInterval || setInterval, setInterval: opts.setInterval || setInterval,
setTimeout: opts.setTimeout || setTimeout,
fs: opts.fs || fs, fs: opts.fs || fs,
cluster: opts.cluster || cluster
}) })
this.isSlave = this.cluster.isWorker
this.monitoringCluster = false
if (this.config.cluster && !this.isSlave) {
for (let i = 1; i <= this.config.cluster; i++) {
this.workers[i] = null
}
}
this.ctx.db.addApplication(name) this.ctx.db.addApplication(name)
} }
startAutoupdater() { startAutoupdater() {
if (this.provider.static) return if (this.provider.static) return
if (this.config.updateEvery === 0) return if (this.config.updateEvery === 0) return
if (this.isSlave) {
this.ctx.log.warn('app.startAutoupdater was called from within a slave')
return
}
let timer = this.setInterval(() => { let timer = this.setInterval(() => {
this.update().then( this.update().then(
@ -64,15 +83,19 @@ export default class Application extends EventEmitter {
timer.unref() timer.unref()
} }
updateLog(message) { updateLog(message, level = 'info') {
this.ctx.db.data.core[this.name].updater += message this.ctx.db.data.core[this.name].updater += message
this.ctx.log.info(message) this.ctx.log[level](message)
return message return message
} }
msgStatic = 'Provider in question is static and so no update required, nothing to do.' msgStatic = 'Provider in question is static and so no update required, nothing to do.'
update() { update() {
if (this.isSlave) {
this.ctx.log.warn('app.update was called from within a slave')
return null
}
if (this.provider.static) { if (this.provider.static) {
if (this.ctx.db.data.core[this.name].updater !== this.msgStatic) { if (this.ctx.db.data.core[this.name].updater !== this.msgStatic) {
this.ctx.db.data.core[this.name].updater = '' this.ctx.db.data.core[this.name].updater = ''
@ -122,19 +145,20 @@ export default class Application extends EventEmitter {
let latest = null let latest = null
try { try {
log += this.updateLog(`Checking for latest version at ${new Date().toISOString().replace('T', ' ').split('.')[0]}. `) + '\n' log += this.updateLog(`Checking for latest version at ${new Date().toISOString().replace('T', ' ').split('.')[0]}. `, 'debug') + '\n'
// Get the latest version from our provider // Get the latest version from our provider
latest = await this.provider.getLatestVersion() latest = await this.provider.getLatestVersion()
log += this.updateLog(`Found ${latest.version}. `) + '\n'
// If the versino matches the latest installed, then there's nothing to do // If the versino matches the latest installed, then there's nothing to do
if (this.ctx.db.data.core[this.name].latestInstalled === latest.version) { if (this.ctx.db.data.core[this.name].latestInstalled === latest.version) {
this.updateLog('Already up to date, nothing to do. ') log += this.updateLog(`Found ${latest.version}. `, 'debug') + '\n'
this.updateLog('Already up to date, nothing to do. ', 'debug')
return null return null
} }
log += this.updateLog(`Found new update ${latest.version}. `) + '\n'
// Make the id for the vesion the version number. Allows for easy lookup // Make the id for the vesion the version number. Allows for easy lookup
// among other nice and simple structure. // among other nice and simple structure.
latest.id = latest.version latest.id = latest.version
@ -241,7 +265,7 @@ export default class Application extends EventEmitter {
// For some weird reason, --loglevel=notice is required otherwise // For some weird reason, --loglevel=notice is required otherwise
// we get practically zero log output. // we get practically zero log output.
await this.ctx.util.runCommand( await this.ctx.util.runCommand(
'npm.cmd', this.ctx.util.getNpmExecutable(),
['install', '--production', '--no-optional', '--no-package-lock', '--no-audit', '--loglevel=notice'], ['install', '--production', '--no-optional', '--no-package-lock', '--no-audit', '--loglevel=notice'],
folder, folder,
function(msg) { log += msg } function(msg) { log += msg }
@ -291,56 +315,128 @@ export default class Application extends EventEmitter {
this.module = module this.module = module
} }
workerDied(worker) {
if (this.workers[worker.w_id] !== worker) {
return
}
this.workers[worker.w_id] = null
this.setTimeout(() => {
if (this.workers[worker.w_id]) return
this.startForkProcess(worker.w_id, this.ctx.db.data.core[this.name].active)
}, this.clusterWaitOnCrash)
}
startForkProcess(i, version) {
this.workers[i] = this.cluster.fork({
CLUSTER_APP_NAME: this.name,
CLUSTER_APP_VERSION: version,
CLUSTER_APP_INDEX: i,
})
let pid = this.workers[i].process.pid
this.ctx.log.info(`Creating worker ${i} (${pid})`)
// this.workers[i].once('listening', () => {
// this.ctx.log.info(`Fork ${i} is alive and listening`)
// })
this.workers[i].once('exit', (code, signal) => {
if (signal) {
// this.ctx.log.info(`Worker ${i} (${pid}) was killed by signal ${signal}`)
} else if (code !== 0) {
this.ctx.log.warn(`Worker ${i} (${pid}) exited with code ${code}`)
}
})
this.workers[i].w_id = i
this.workers[i].listening = false
this.workers[i].started = new Date()
}
async runVersion(version) { async runVersion(version) {
this.ctx.db.data.core[this.name].active = version this.ctx.db.data.core[this.name].active = version
await this.ctx.db.write() await this.ctx.db.write()
if (version !== 'static') { if (this.config.cluster && !this.isSlave) {
let indexPath = this.ctx.util.getPathFromRoot(`./${this.name}/${version}/index.mjs`) for (let i = 1; i <= this.config.cluster; i++) {
await this.fs.stat(indexPath).catch((err) => { this.startForkProcess(i, version)
return Promise.reject(new Error(`Version was missing index.mjs: ${err.message}`)) }
} else {
if (version !== 'static') {
let indexPath = this.ctx.util.getPathFromRoot(`./${this.name}/${version}/index.mjs`)
await this.fs.stat(indexPath).catch((err) => {
return Promise.reject(new Error(`Version was missing index.mjs: ${err.message}`))
})
this.fresh = false
let module = await import(this.ctx.util.getUrlFromRoot(`./${this.name}/${version}/index.mjs`))
this.registerModule(module, version)
} else {
this.fresh = false
}
let errTimeout = new Error(`Version timed out (took over ${this.config.startWaitUntilFail}ms) while running start()`)
await new Promise((res, rej) => {
setTimeout(() => {
rej(errTimeout)
}, this.config.startWaitUntilFail)
let startRes = this.module.start(this.http, this.config.port, this.ctx)
if (startRes && startRes.then) {
return startRes.then(res, rej)
}
res()
}) })
this.fresh = false if (!this.http.active) {
let module = await import(this.ctx.util.getUrlFromRoot(`./${this.name}/${version}/index.mjs`)) return Promise.reject(new Error(`Version did not call http.createServer()`))
this.registerModule(module, version) }
} else {
this.fresh = false
} }
let errTimeout = new Error(`Version timed out (took over ${this.config.startWaitUntilFail}ms) while running start()`) if (this.isSlave) {
return
await new Promise((res, rej) => {
setTimeout(() => {
rej(errTimeout)
}, this.config.startWaitUntilFail)
let startRes = this.module.start(this.http, this.config.port, this.ctx)
if (startRes && startRes.then) {
return startRes.then(res, rej)
}
res()
})
if (!this.http.active) {
return Promise.reject(new Error(`Version did not call http.createServer()`))
} }
let lastErr = null let lastErr = null
for (let i = 0; i < this.config.heartbeatAttempts; i++) { for (let i = 0; i < this.config.heartbeatAttempts; i++) {
try { try {
await request({ timeout: this.config.heartbeatAttemptsWait }, `http://localhost:${this.config.port}` + this.config.heartbeatPath, null, 0, true) await request({ timeout: this.config.heartbeatTimeout }, `http://localhost:${this.config.port}` + this.config.heartbeatPath, null, 0, true)
return lastErr = null
break
} catch (err) { } catch (err) {
lastErr = err lastErr = err
await new Promise((res) => {
setTimeout(res, this.config.heartbeatAttemptsWait)
})
// this.config.heartbeatAttemptsWait
} }
} }
return Promise.reject(new Error(`Version failed to start properly: ${lastErr.message}`)) if (lastErr) {
let err = new Error(`Version failed to start properly: ${lastErr.message}`)
err.stack = lastErr.stack
return Promise.reject(err)
}
if (this.config.cluster && !this.isSlave) {
this.__clusterWorkerDied = this.workerDied.bind(this)
this.cluster.on('exit', this.__clusterWorkerDied)
}
} }
closeServer() { closeServer() {
if (this.config.cluster && !this.isSlave) {
if (this.__clusterWorkerDied) {
this.cluster.off('exit', this.__clusterWorkerDied)
}
this.__clusterWorkerDied = null
for (let i = 1; i <= this.config.cluster; i++) {
if (this.workers[i]) {
this.workers[i].process.kill()
this.workers[i] = null
}
}
}
return this.http.closeServer() return this.http.closeServer()
} }
} }

View file

@ -2,6 +2,7 @@ import http from 'http'
import https from 'https' import https from 'https'
import stream from 'stream/promises' import stream from 'stream/promises'
import fs from 'fs' import fs from 'fs'
import Util from './util.mjs'
function resolveRelative(from, to) { function resolveRelative(from, to) {
const resolvedUrl = new URL(to, new URL(from, 'resolve://')); const resolvedUrl = new URL(to, new URL(from, 'resolve://'));
@ -35,7 +36,7 @@ export function request(config, path, filePath = null, redirects, fastRaw = fals
let req = null let req = null
return new Promise(function(resolve, reject) { let orgErr = new Error(); return new Promise(function(resolve, reject) {
if (!path) { if (!path) {
return reject(new Error('Request path was empty')) return reject(new Error('Request path was empty'))
} }
@ -52,7 +53,7 @@ export function request(config, path, filePath = null, redirects, fastRaw = fals
let timer = setTimeout(function() { let timer = setTimeout(function() {
timedout = true timedout = true
if (req) { req.destroy() } if (req) { req.destroy() }
reject(new Error(`Request ${path} timed out after ${timeout}ms`)) reject(Util.combineStack(new Error(`Request ${path} timed out after ${timeout}ms`), orgErr))
}, timeout) }, timeout)
req = h.request({ req = h.request({
@ -81,7 +82,7 @@ export function request(config, path, filePath = null, redirects, fastRaw = fals
}) })
}, function(err) { }, function(err) {
if (err.code === 'ABORT_ERR') return if (err.code === 'ABORT_ERR') return
reject(err) reject(Util.combineStack(err, orgErr))
}) })
// let file = fs.createWriteStream(filePath) // let file = fs.createWriteStream(filePath)
// res.pipe(file) // res.pipe(file)
@ -113,11 +114,11 @@ export function request(config, path, filePath = null, redirects, fastRaw = fals
if (err) { if (err) {
ac.abort() ac.abort()
if (!filePath) return reject(err) if (!filePath) return reject(Util.combineStack(err, orgErr))
// Do some cleanup in case we were in the middle of downloading file // Do some cleanup in case we were in the middle of downloading file
return fs.rm(filePath, function() { return fs.rm(filePath, function() {
reject(err) reject(Util.combineStack(err))
}) })
} }
// Let the pipeline do the resolving so it can finish flusing before calling resolve // Let the pipeline do the resolving so it can finish flusing before calling resolve
@ -137,11 +138,11 @@ export function request(config, path, filePath = null, redirects, fastRaw = fals
if (timedout) return if (timedout) return
let wrapped = new Error(`Error during request ${path}: ${err.message}`) let wrapped = new Error(`Error during request ${path}: ${err.message}`)
wrapped.code = err.code wrapped.code = err.code
reject(wrapped) reject(Util.combineStack(wrapped, orgErr))
}) })
req.on('timeout', function(err) { req.on('timeout', function(err) {
if (timedout) return if (timedout) return
reject(err) reject(Util.combineStack(err, orgErr))
}) })
req.end() req.end()
@ -155,9 +156,9 @@ export function request(config, path, filePath = null, redirects, fastRaw = fals
res.body = JSON.parse(res.body) res.body = JSON.parse(res.body)
} catch(e) { } catch(e) {
if (res.body.indexOf('<!DOCTYPE') < 100 || res.body.indexOf('<html') < 100) { if (res.body.indexOf('<!DOCTYPE') < 100 || res.body.indexOf('<html') < 100) {
return Promise.reject(new Error('Error parsing body, expected JSON but got HTML instead: ' + res.body)) return Promise.reject(Util.combineStack(new Error('Error parsing body, expected JSON but got HTML instead: ' + res.body), orgErr))
} }
return Promise.reject(new Error(`Error parsing body ${res.body}: ${e.message}`)) return Promise.reject(Util.combineStack(new Error(`Error parsing body ${res.body}: ${e.message}`), orgErr))
} }
} }
} }

View file

@ -1,7 +1,9 @@
import cluster from 'cluster'
import { Low } from 'lowdb'
import Application from './application.mjs' import Application from './application.mjs'
import Util from './util.mjs' import Util from './util.mjs'
import getLog from './log.mjs' import getLog from './log.mjs'
import { Low } from 'lowdb'
import StaticProvider from './providers/static.mjs' import StaticProvider from './providers/static.mjs'
import GitProvider from './providers/git.mjs' import GitProvider from './providers/git.mjs'
@ -44,6 +46,7 @@ export default class Core {
this.applications = [] this.applications = []
this.applicationMap = new Map() this.applicationMap = new Map()
this._applicationFatalCrash = null this._applicationFatalCrash = null
this.isSlave = cluster.isWorker
} }
getApplication(name) { getApplication(name) {
@ -59,15 +62,27 @@ export default class Core {
this.log.info(`Found applications: ${names.join(', ')}.`) this.log.info(`Found applications: ${names.join(', ')}.`)
for (let name of names) { for (let name of names) {
if (this.isSlave && process.env.CLUSTER_APP_NAME !== name) {
continue
}
try { try {
let provConstructor = Core.providers.get(this.db.config[name].provider) let provConstructor = Core.providers.get(this.db.config[name].provider)
let provider = new provConstructor(this.db.config[name]) let provider = new provConstructor(this.db.config[name])
await provider.checkConfig(this.db.config[name])
if (!this.isSlave) {
await provider.checkConfig(this.db.config[name])
}
let logName = name
if (this.isSlave && process.env.CLUSTER_APP_INDEX) {
logName += '-' + process.env.CLUSTER_APP_INDEX
}
let application = new Application({ let application = new Application({
db: this.db, db: this.db,
util: this.util, util: this.util,
log: getLog(name, this.db.config[name].log || null), log: getLog(logName, this.db.config[name].log || null),
core: this, core: this,
}, provider, name) }, provider, name)
this.applications.push(application) this.applications.push(application)
@ -86,15 +101,28 @@ export default class Core {
if (this.running) return if (this.running) return
this.running = true this.running = true
this.log.info(`Running updater on ${this.applications.length} apps`) if (!this.isSlave) {
await Promise.all(this.applications.map((app) => { this.log.info(`Running updater on ${this.applications.length} apps`)
return app.update().catch(err => { await Promise.all(this.applications.map((app) => {
app.ctx.log.error(err, `Error updating: ${err.message}`) return app.update().catch(err => {
}) app.ctx.log.error(err, `Error updating: ${err.message}`)
})) })
}))
}
let found = false let found = false
if (this.isSlave) {
let app = this.getApplication(process.env.CLUSTER_APP_NAME)
try {
await app.runVersion(process.env.CLUSTER_APP_VERSION)
} catch (err) {
app.ctx.log.fatal(err)
return Promise.reject(err)
}
return
}
for (let app of this.applications) { for (let app of this.applications) {
app.startAutoupdater() app.startAutoupdater()
@ -136,6 +164,7 @@ export default class Core {
application.ctx.log.warn(`Restarting for ${version.version} due to last run failing while not being in fresh state`) application.ctx.log.warn(`Restarting for ${version.version} due to last run failing while not being in fresh state`)
return this.restart(`Application ${name} has fresh false while attempting to run ${version.version} with stable -1`) return this.restart(`Application ${name} has fresh false while attempting to run ${version.version} with stable -1`)
} }
await application.closeServer() await application.closeServer()
this._applicationFatalCrash = this.criticalError.bind(this, application, version) this._applicationFatalCrash = this.criticalError.bind(this, application, version)
@ -149,6 +178,7 @@ export default class Core {
found = true found = true
version.stable = 1 version.stable = 1
await this.db.write() await this.db.write()
application.ctx.log.info(`${version.version} is up and running`)
break break
} catch(err) { } catch(err) {
application.ctx.log.error(err, `Error starting ${version.version}: ${err.message}`) application.ctx.log.error(err, `Error starting ${version.version}: ${err.message}`)

View file

@ -100,14 +100,15 @@ export default function GetDB(config, log, orgFilename = 'db.json') {
db.write = function() { db.write = function() {
return this._write() return this._write()
// Do couple of retries. Sometimes it fails randomly doing atomic writes. // Do couple of retries. Sometimes it fails randomly doing atomic writes.
.catch(() => { return setTimeout(20) }) .catch(() => { return setTimeout(20).then(() => { return this._write() }) })
.then(() => { return this._write() }) .catch(() => { return setTimeout(50).then(() => { return this._write() }) })
.catch(() => { return setTimeout(50) }) .catch(() => { return setTimeout(100).then(() => { return this._write() }) })
.then(() => { return this._write() })
.catch(() => { return setTimeout(100) })
.then(() => { return this._write() })
.catch((err) => { .catch((err) => {
this.log.error(err, 'Error saving to db') try {
db.writeSync()
} catch {
this.log.error(err, 'Error saving to db')
}
}) })
} }

View file

@ -2,6 +2,22 @@
import bunyan from 'bunyan-lite' import bunyan from 'bunyan-lite'
import { setTimeout } from 'timers/promises' import { setTimeout } from 'timers/promises'
export function getDefaultStreams() {
if (process.env.NODE_ENV === 'production') {
return [{
path: 'log.log',
level: 'info',
}
]
} else {
return [{
"stream": process.stdout,
"level": "debug"
}
]
}
}
export default function getLog(name, streams = null, opts = {}) { export default function getLog(name, streams = null, opts = {}) {
let settings let settings
let ringbuffer = new bunyan.RingBuffer({ limit: 100 }) let ringbuffer = new bunyan.RingBuffer({ limit: 100 })
@ -15,24 +31,9 @@ export default function getLog(name, streams = null, opts = {}) {
}) })
} }
if (process.env.NODE_ENV === 'production') { settings = {
settings = { name: name,
"name": name, streams: streams || getDefaultStreams()
"streams": streams || [{
path: 'log.log',
level: 'info',
}
]
}
} else {
settings = {
"name": name,
"streams": streams || [{
"stream": process.stdout,
"level": "debug"
}
]
}
} }
let logger let logger
@ -122,6 +123,13 @@ export default function getLog(name, streams = null, opts = {}) {
error: function() { return Promise.resolve() }, error: function() { return Promise.resolve() },
} }
} }
logger.on('error', function(err) {
if (process.env.NODE_ENV === 'production') {
logger.event.warn(`Error in logger ${name}: ${err.message}`)
} else {
console.log(`Error in logger ${name}: ${err.message}`)
}
})
logger.ringbuffer = ringbuffer logger.ringbuffer = ringbuffer
logger.ringbufferwarn = ringbufferwarn logger.ringbufferwarn = ringbufferwarn

View file

@ -1,6 +1,7 @@
import cluster from 'cluster'
import Util from './util.mjs' import Util from './util.mjs'
import fs from 'fs/promises' import fs from 'fs/promises'
import getLog from './log.mjs' import getLog, { getDefaultStreams } from './log.mjs'
import GetDB from './db.mjs' import GetDB from './db.mjs'
import Core from './core.mjs' import Core from './core.mjs'
@ -22,7 +23,13 @@ export async function runner(root_import_meta_url, configname = 'config.json', d
} }
} }
const log = getLog(config.name) let streams = getDefaultStreams()
if (cluster.isWorker) {
streams[0].level = 'error'
}
const log = getLog(config.name, streams)
runner.log = log runner.log = log
const db = await GetDB(config, log, util.getPathFromRoot('./' + dbname)) const db = await GetDB(config, log, util.getPathFromRoot('./' + dbname))

View file

@ -5,6 +5,11 @@ import { spawn, execSync } from 'child_process'
import { fileURLToPath, pathToFileURL } from 'url' import { fileURLToPath, pathToFileURL } from 'url'
export default class Util { export default class Util {
static combineStack(err, appendErr) {
err.stack = err.stack + '\nFrom:\n' + appendErr.stack.split('\n').slice(1).join('\n') + '\n'
return err
}
constructor(root_import_meta_url) { constructor(root_import_meta_url) {
this._root_import_meta_url = root_import_meta_url this._root_import_meta_url = root_import_meta_url
} }
@ -79,6 +84,10 @@ export default class Util {
if (!valid) if (!valid)
continue continue
} }
if (config[key].cluster != null && (typeof(config[key].cluster) !== 'number' || config[key].cluster > 100 || config[key].cluster < 0))
continue
if (config[key].clusterWaitOnCrash != null && (typeof(config[key].clusterWaitOnCrash) !== 'number' || config[key].clusterWaitOnCrash < 10))
continue
out.push(key) out.push(key)
} }
@ -93,6 +102,13 @@ export default class Util {
return util.getPathFromRoot('../bin/7zdec') return util.getPathFromRoot('../bin/7zdec')
} }
getNpmExecutable() {
if (process.platform === 'win32') {
return 'npm.cmd'
}
return 'npm'
}
verifyConfig(config) { verifyConfig(config) {
if (!config.name) throw new Error('name is missing in config.json') if (!config.name) throw new Error('name is missing in config.json')
if (this.getAppNames(config).length === 0) throw new Error('no application was found in config') if (this.getAppNames(config).length === 0) throw new Error('no application was found in config')

View file

@ -5,7 +5,7 @@
"main": "index.mjs", "main": "index.mjs",
"scripts": { "scripts": {
"dev": "nodemon --watch dev/api --watch core --watch runner.mjs --watch db.mjs --watch log.mjs runner.mjs | bunyan", "dev": "nodemon --watch dev/api --watch core --watch runner.mjs --watch db.mjs --watch log.mjs runner.mjs | bunyan",
"test": "eltro \"test/**/*.test.mjs\" -r dot", "test": "eltro -t 250 \"test/**/*.test.mjs\" -r dot",
"test:integration": "eltro \"test/**/*.test.integration.mjs\" -r list", "test:integration": "eltro \"test/**/*.test.integration.mjs\" -r list",
"test:test": "eltro \"test/application.test.integration.mjs\" -r list", "test:test": "eltro \"test/application.test.integration.mjs\" -r list",
"test:spec": "eltro \"test/**/*.test.mjs\" -r list", "test:spec": "eltro \"test/**/*.test.mjs\" -r list",
@ -52,6 +52,6 @@
"lowdb": "^3.0.0" "lowdb": "^3.0.0"
}, },
"devDependencies": { "devDependencies": {
"eltro": "^1.3.0" "eltro": "^1.3.1"
} }
} }

View file

@ -0,0 +1,122 @@
import { Eltro as t, assert, stub } from 'eltro'
import fs from 'fs/promises'
import Application from '../core/application.mjs'
import Util from '../core/util.mjs'
import lowdb from '../core/db.mjs'
import StaticProvider from '../core/providers/static.mjs'
import { createFakeContext } from './helpers.mjs'
import HttpServer from '../core/http.mjs'
const util = new Util(import.meta.url)
t.describe('#runVersion("version") cluster mode', function() {
const assertPort = 22345
const assertClusterSize = Math.floor(Math.random() * (8 - 2 + 1) + 2)
const http = new HttpServer()
let handler
let cluster
let workers
let ctx
let app
t.beforeEach(function() {
return createFakeContext({ testnoexisting: { cluster: assertClusterSize } }, util, null)
.then(function(res) {
workers = []
ctx = res
let provider = new StaticProvider()
cluster = {
on: stub(),
off: stub(),
fork: stub().returnWith(function() {
let worker = {
on: stub(),
once: stub(),
process: {
kill: stub(),
}
}
workers.push(worker)
return worker
}),
isWorker: false,
}
app = new Application(ctx, provider, 'testnoexisting', { cluster })
app.config.port = assertPort
app.workerDied = stub()
return app.ctx.db.write()
})
})
t.afterEach(function() {
return Promise.all([
app.closeServer(),
])
})
t.before(function() {
handler = function(req, res) {
res.writeHead(200)
res.end('{}')
}
let server = http.createServer(function(req, res) {
req.on('error', function() { })
res.on('error', function() { })
res.on('finish', function() { })
handler(req, res)
})
return server.listenAsync(assertPort)
})
t.after(function() {
return http.closeServer()
})
t.test('should call fork for entire cluster and attach handlers correctly', async function() {
let lastErr = null
for (let i = 1; i <= assertClusterSize; i++) {
assert.strictEqual(app.workers[i], null)
}
handler = function(req, res) {
try {
assert.notOk(cluster.on.called)
assert.notOk(app.__clusterWorkerDied)
lastErr = false
} catch (err) {
lastErr = err
}
res.writeHead(200)
res.end('{}')
}
const assertVersion = 'v11.22.33'
await app.runVersion(assertVersion)
assert.strictEqual(cluster.fork.callCount, assertClusterSize)
for (let i = 1; i <= assertClusterSize; i++) {
assert.ok(app.workers[i])
assert.strictEqual(app.workers[i], workers[i - 1])
assert.strictEqual(cluster.fork.getCallN(i)[0].CLUSTER_APP_NAME, app.name)
assert.strictEqual(cluster.fork.getCallN(i)[0].CLUSTER_APP_VERSION, assertVersion)
assert.strictEqual(app.workers[i].w_id, i)
assert.strictEqual(app.workers[i].listening, false)
assert.ok(app.workers[i].started)
assert.ok(new Date() - app.workers[i].started < 100 && new Date() - app.workers[i].started >= 0)
}
assert.strictEqual(lastErr, false)
assert.strictEqual(cluster.on.callCount, 1)
assert.ok(app.__clusterWorkerDied)
assert.strictEqual(cluster.on.firstCall[0], 'exit')
assert.strictEqual(cluster.on.firstCall[1], app.__clusterWorkerDied)
assert.notStrictEqual(app.workerDied, app.__clusterWorkerDied)
assert.notOk(app.workerDied.called)
app.__clusterWorkerDied()
assert.ok(app.workerDied.called)
})
})

View file

@ -15,7 +15,7 @@ t.describe('#runVersion("static")', function() {
const defaultHandler = function(orgHandler) { const defaultHandler = function(orgHandler) {
let handler = orgHandler || function (req, res) { let handler = orgHandler || function (req, res) {
res.writeHead(204); res.end(JSON.stringify({ a: 1 })) res.writeHead(200); res.end(JSON.stringify({ a: 1 }))
} }
return function(http, port, ctx) { return function(http, port, ctx) {
const server = http.createServer(handler) const server = http.createServer(handler)
@ -36,7 +36,7 @@ t.describe('#runVersion("static")', function() {
}) })
t.afterEach(function() { t.afterEach(function() {
return app.http.closeServer() return app.closeServer()
}) })
t.test('should throw if http is not called', async function() { t.test('should throw if http is not called', async function() {
@ -98,6 +98,7 @@ t.describe('#runVersion("static")', function() {
res.end(JSON.stringify({ a: 1 })) res.end(JSON.stringify({ a: 1 }))
} }
app.config.heartbeatAttempts = 3 app.config.heartbeatAttempts = 3
app.config.heartbeatAttemptsWait = 5
app.registerModule(defaultHandler(handler)) app.registerModule(defaultHandler(handler))
assert.strictEqual(app.fresh, true) assert.strictEqual(app.fresh, true)
@ -116,7 +117,8 @@ t.describe('#runVersion("static")', function() {
called++ called++
} }
app.config.heartbeatAttempts = 2 app.config.heartbeatAttempts = 2
app.config.heartbeatAttemptsWait = 30 app.config.heartbeatTimeout = 30
app.config.heartbeatAttemptsWait = 10
app.registerModule(defaultHandler(handler)) app.registerModule(defaultHandler(handler))
assert.strictEqual(app.fresh, true) assert.strictEqual(app.fresh, true)
@ -129,7 +131,7 @@ t.describe('#runVersion("static")', function() {
assert.match(err.message, /time/i) assert.match(err.message, /time/i)
assert.match(err.message, /out/i) assert.match(err.message, /out/i)
assert.match(err.message, /30ms/i) assert.match(err.message, /30ms/i)
assert.ok(end - start > app.config.heartbeatAttempts * app.config.heartbeatAttemptsWait) assert.ok(end - start > app.config.heartbeatAttempts * (app.config.heartbeatTimeout + app.config.heartbeatAttemptsWait))
assert.strictEqual(called, 2) assert.strictEqual(called, 2)
assert.strictEqual(ctx.db.data.core.testapp.active, 'static') assert.strictEqual(ctx.db.data.core.testapp.active, 'static')
}) })
@ -145,13 +147,14 @@ t.describe('#runVersion("static")', function() {
} }
} }
app.config.heartbeatAttempts = 3 app.config.heartbeatAttempts = 3
app.config.heartbeatAttemptsWait = 5
app.registerModule(defaultHandler(handler)) app.registerModule(defaultHandler(handler))
let err = await assert.isRejected(app.runVersion('static')) let err = await assert.isRejected(app.runVersion('static'))
assert.match(err.message, /failed/i) assert.match(err.message, /failed/i)
assert.match(err.message, /400/i) assert.match(err.message, /400/i)
await app.http.closeServer() await app.closeServer()
app.registerModule(defaultHandler(handler)) app.registerModule(defaultHandler(handler))
app.config.heartbeatPath = assertPath app.config.heartbeatPath = assertPath
@ -159,6 +162,24 @@ t.describe('#runVersion("static")', function() {
assert.strictEqual(ctx.db.data.core.testapp.active, 'static') assert.strictEqual(ctx.db.data.core.testapp.active, 'static')
}) })
t.test('should not check heartbeat if slave', async function() {
let called = 0
const handler = function(req, res) {
called++
res.statusCode = 400
res.end(JSON.stringify({ a: 1 }))
}
app.registerModule(defaultHandler(handler))
app.isSlave = true
assert.strictEqual(app.fresh, true)
await app.runVersion('static')
assert.strictEqual(app.fresh, false)
assert.strictEqual(called, 0)
assert.strictEqual(ctx.db.data.core.testapp.active, 'static')
})
}) })
t.describe('#runVersion("version")', function() { t.describe('#runVersion("version")', function() {
@ -190,7 +211,7 @@ t.describe('#runVersion("version")', function() {
t.afterEach(function() { t.afterEach(function() {
return Promise.all([ return Promise.all([
fs.rm(assertConfig), fs.rm(assertConfig),
app.http.closeServer(), app.closeServer(),
]) ])
}) })

View file

@ -32,7 +32,7 @@ t.after(function() {
}) })
}) })
t.timeout(10000).test('should run update and install correctly', async function(){ t.skip().timeout(10000).test('should run update and install correctly', async function(){
try { try {
await app.update() await app.update()
} catch (err) { } catch (err) {

View file

@ -58,19 +58,84 @@ t.describe('constructor()', function() {
assert.strictEqual(app.config.heartbeatAttempts, 5) assert.strictEqual(app.config.heartbeatAttempts, 5)
assert.strictEqual(app.config.heartbeatAttemptsWait, 2 * 1000) assert.strictEqual(app.config.heartbeatAttemptsWait, 2 * 1000)
assert.strictEqual(app.config.heartbeatPath, '/') assert.strictEqual(app.config.heartbeatPath, '/')
assert.strictEqual(app.config.clusterWaitOnCrash, 1 * 1000)
assert.strictEqual(app.ctx.db, ctx.db) assert.strictEqual(app.ctx.db, ctx.db)
assert.strictEqual(app.ctx.app, app) assert.strictEqual(app.ctx.app, app)
assert.strictEqual(app.ctx.util, ctx.util) assert.strictEqual(app.ctx.util, ctx.util)
assert.strictEqual(app.name, assertName) assert.strictEqual(app.name, assertName)
assert.strictEqual(app.fresh, true) assert.strictEqual(app.fresh, true)
assert.strictEqual(app.running, false) assert.strictEqual(app.running, false)
assert.strictEqual(app.monitoringCluster, false)
assert.deepStrictEqual(app.workers, {})
assert.strictEqual(app.isSlave, false)
assert.ok(app.http) assert.ok(app.http)
assert.ok(app.http.sockets) assert.ok(app.http.sockets)
assert.strictEqual(typeof(app.http.createServer), 'function') assert.strictEqual(typeof(app.http.createServer), 'function')
assert.strictEqual(typeof(app.http.closeServer), 'function') assert.strictEqual(typeof(app.http.closeServer), 'function')
}) })
t.test('should support overriding defaults', function() {
const assertTest = {
a: 1,
updateEvery: 10,
startWaitUntilFail: 10,
heartbeatTimeout: 10,
heartbeatAttempts: 10,
heartbeatAttemptsWait: 10,
heartbeatPath: '/asdf',
clusterWaitOnCrash: 10,
}
const assertName = 'test'
ctx.db.config = {
test: assertTest,
app: { b: 2},
manage: { c: 3 },
}
let app = new Application(ctx, {}, assertName)
assert.notStrictEqual(app.config, assertTest)
assert.strictEqual(app.config.a, assertTest.a)
assert.strictEqual(app.config.updateEvery, 10)
assert.strictEqual(app.config.startWaitUntilFail, 10)
assert.strictEqual(app.config.heartbeatTimeout, 10)
assert.strictEqual(app.config.heartbeatAttempts, 10)
assert.strictEqual(app.config.heartbeatAttemptsWait, 10)
assert.strictEqual(app.config.heartbeatPath, '/asdf')
assert.strictEqual(app.config.clusterWaitOnCrash, 10)
})
t.test('should fill out workers indexes if is master', function() {
const assertTest = { a: 1, cluster: 2 }
const assertName = 'test'
ctx.db.config = {
test: assertTest,
app: { b: 2},
manage: { c: 3 },
}
let app = new Application(ctx, {}, assertName)
assert.notStrictEqual(app.config, assertTest)
assert.strictEqual(app.config.a, assertTest.a)
assert.deepStrictEqual(app.workers, {
1: null,
2: null,
})
})
t.test('should leave workers empty if not master', function() {
const assertTest = { a: 1, cluster: 2 }
const assertName = 'test'
ctx.db.config = {
test: assertTest,
app: { b: 2},
manage: { c: 3 },
}
let app = new Application(ctx, {}, assertName, { cluster: { isWorker: true } })
assert.notStrictEqual(app.config, assertTest)
assert.strictEqual(app.config.a, assertTest.a)
assert.deepStrictEqual(app.workers, {})
})
t.test('should not default updateEvery if its zero', function() { t.test('should not default updateEvery if its zero', function() {
const assertTest = { a: 1, updateEvery: 0 } const assertTest = { a: 1, updateEvery: 0 }
@ -107,7 +172,7 @@ t.describe('constructor()', function() {
}) })
}) })
t.timeout(250).describe('#startAutoupdater()', function() { t.describe('#startAutoupdater()', function() {
let ctx let ctx
t.beforeEach(function() { t.beforeEach(function() {
@ -124,6 +189,20 @@ t.timeout(250).describe('#startAutoupdater()', function() {
app.startAutoupdater() app.startAutoupdater()
}) })
t.test('should do nothing if isSlave but should warn', async function() {
const stubInterval = stub()
stubInterval.throws(new Error('should not be seen'))
let app = new Application(ctx, { }, 'teststatic', { setInterval: stubInterval })
app.ctx.log.warn.reset()
assert.notOk(app.ctx.log.warn.called)
app.isSlave = true
app.startAutoupdater()
assert.ok(app.ctx.log.warn.called)
assert.match(app.ctx.log.warn.firstCall[0], /slave/i)
})
t.test('should do nothing if updateEvery is zero', async function() { t.test('should do nothing if updateEvery is zero', async function() {
const stubInterval = stub() const stubInterval = stub()
stubInterval.throws(new Error('should not be seen')) stubInterval.throws(new Error('should not be seen'))
@ -222,15 +301,19 @@ t.timeout(250).describe('#startAutoupdater()', function() {
}) })
}) })
t.timeout(250).describe('#closeServer()', function() { t.describe('#closeServer()', function() {
let app let app
let cluster
let stubCloseServer let stubCloseServer
t.beforeEach(function() { t.beforeEach(function() {
cluster = {
off: stub(),
}
return createFakeContext() return createFakeContext()
.then(function(res) { .then(function(res) {
let provider = createProvider() let provider = createProvider()
app = new Application(res, provider, 'testapp') app = new Application(res, provider, 'testapp', { cluster: cluster })
app.http.closeServer = stubCloseServer = stub().resolves() app.http.closeServer = stubCloseServer = stub().resolves()
}) })
}) })
@ -244,12 +327,109 @@ t.timeout(250).describe('#closeServer()', function() {
assert.strictEqual(err, assertError) assert.strictEqual(err, assertError)
}) })
t.test('should kill entire cluster if cluster master', async function() {
let workers = {}
let handle = app.__clusterWorkerDied = function() {}
app.config.cluster = Math.floor(Math.random() * (10 - 4 + 1) + 4)
for (let i = 1; i <= app.config.cluster; i++) {
if (i % 2 === 0) {
workers[i] = app.workers[i] = {
process: {
kill: stub().returnWith(function() {
assert.strictEqual(app.__clusterWorkerDied, null)
assert.strictEqual(cluster.off.callCount, 1)
assert.strictEqual(cluster.off.firstCall[0], 'exit')
assert.strictEqual(cluster.off.firstCall[1], handle)
})
}
}
} else {
workers[i] = app.workers[i] = null
}
}
await app.closeServer()
for (let i = 1; i <= app.config.cluster; i++) {
if (workers[i]) {
assert.ok(workers[i].process.kill.called)
assert.strictEqual(app.__clusterWorkerDied, null)
}
}
})
t.test('otherwise should work fine', async function() { t.test('otherwise should work fine', async function() {
await app.closeServer() await app.closeServer()
}) })
}) })
t.timeout(250).describe('#update()', function() { t.describe('#workerDied()', function() {
let app
let stubTimeout
let stubFork
t.beforeEach(function() {
stubTimeout = stub()
return createFakeContext()
.then(function(res) {
let provider = createProvider()
app = new Application(res, provider, 'testapp', { setTimeout: stubTimeout })
app.startForkProcess = stubFork = stub()
})
})
t.test('should do nothing if worker is not found at index', function() {
const assertWorkerId = 2
const assertWorker = { a: 1, w_id: assertWorkerId }
stubTimeout.throws(new Error('should not be seen'))
stubFork.throws(new Error('should not be seen'))
app.workers[1] = assertWorker
app.workerDied(assertWorker)
})
t.test('should mark worker as null and call setTimeout', function() {
const assertWorkerId = 2
const assertTimeoutDuration = 12421
const assertVersion = 'v11.22.33'
const assertWorker = { a: 1, w_id: assertWorkerId }
app.ctx.db.data.core[app.name].active = assertVersion
app.workers[assertWorkerId] = assertWorker
app.clusterWaitOnCrash = assertTimeoutDuration
app.workerDied(assertWorker)
assert.strictEqual(app.workers[assertWorkerId], null)
assert.ok(stubTimeout.called)
assert.strictEqual(typeof(stubTimeout.firstCall[0]), 'function')
assert.strictEqual(stubTimeout.firstCall[1], assertTimeoutDuration)
assert.notOk(stubFork.called)
stubTimeout.firstCall[0]()
assert.ok(stubFork.called)
assert.ok(stubFork.firstCall[0], assertWorkerId)
assert.ok(stubFork.firstCall[1], assertVersion)
})
t.test('setTimeout should not call startForkProcess if one was already started', function() {
const assertWorkerId = 2
const assertWorker = { a: 1, w_id: assertWorkerId }
app.workers[assertWorkerId] = assertWorker
app.workerDied(assertWorker)
assert.strictEqual(app.workers[assertWorkerId], null)
app.workers[assertWorkerId] = { a : 2 }
stubTimeout.firstCall[0]()
assert.notOk(stubFork.called)
})
})
t.describe('#update()', function() {
let ctx let ctx
let app let app
let provider let provider
@ -321,6 +501,23 @@ t.timeout(250).describe('#update()', function() {
assert.strictEqual(stubWrite.callCount, 2) assert.strictEqual(stubWrite.callCount, 2)
}) })
t.test('should do nothing if slave but warn', async function() {
stubWrite.reset().resolves()
app.isSlave = true
app.ctx.log.warn.reset()
assert.notOk(app.ctx.log.warn.called)
let result = await app.update()
assert.strictEqual(result, null)
assert.notOk(ctx.db.data.core.teststatic)
assert.notOk(stubWrite.called)
assert.ok(app.ctx.log.warn.called)
assert.match(app.ctx.log.warn.firstCall[0], /slave/i)
})
t.test('multiple calls should be safe', async function() { t.test('multiple calls should be safe', async function() {
ctx.db.data.core.testapp.updater = '' ctx.db.data.core.testapp.updater = ''
@ -638,7 +835,7 @@ t.timeout(250).describe('#update()', function() {
assert.notOk(stubUpdated.called) assert.notOk(stubUpdated.called)
assert.strictEqual(app.updating, false) assert.strictEqual(app.updating, false)
assert.strictEqual(err, assertError) assert.strictEqual(err, assertError)
assert.strictEqual(stubRunCommand.firstCall[0], 'npm.cmd') assert.strictEqual(stubRunCommand.firstCall[0], util.getNpmExecutable())
assert.ok(stubRunCommand.firstCall[1]) assert.ok(stubRunCommand.firstCall[1])
assert.strictEqual(stubRunCommand.firstCall[1][0], 'install') assert.strictEqual(stubRunCommand.firstCall[1][0], 'install')
assert.ok(stubRunCommand.firstCall[1].includes('--production'), 'should have --production') assert.ok(stubRunCommand.firstCall[1].includes('--production'), 'should have --production')
@ -845,7 +1042,7 @@ t.timeout(250).describe('#update()', function() {
}) })
}) })
t.timeout(250).describe('#registerModule()', function() { t.describe('#registerModule()', function() {
const assertAppName = 'testappregister' const assertAppName = 'testappregister'
let ctx let ctx
let app let app

236
test/core.cluster.test.mjs Normal file
View file

@ -0,0 +1,236 @@
import { Eltro as t, assert, stub } from 'eltro'
import fs from 'fs/promises'
import Core from '../core/core.mjs'
import Util from '../core/util.mjs'
import { createFakeLog } from './helpers.mjs'
import StaticProvider from '../core/providers/static.mjs'
import lowdb from '../core/db.mjs'
const util = new Util(import.meta.url)
const log = createFakeLog()
let db
t.before(function() {
return lowdb({}, log, null).then(function(res) {
db = res
})
})
t.describe('#init()', function() {
const assertProviderName = 'Kyousuu Gakku Gogyou Kikan'
let core
let fakeUtil
let fakeProvider
let fakeProviderConfig
function FakeProvider(config) {
fakeProvider(config)
this.static = true
this.checkConfig = fakeProviderConfig
}
t.beforeEach(function() {
log.error.reset()
core = new Core(db, util, log)
core.util = fakeUtil = {
verifyConfig: stub(),
getAppNames: stub().returns([]),
}
fakeProvider = stub()
fakeProviderConfig = stub()
Core.providers.set(assertProviderName, FakeProvider)
core.isSlave = true
})
t.after(function() {
return Promise.all([
fs.rm('./log_test_1.log', { recursive: true, force: true }).catch(function() {}),
])
})
t.test('should not call provider checkConfig if isSlave', async function() {
const assertAppName = 'Zetsubou'
const assertConfig = {
[assertAppName]: {
provider: assertProviderName,
}
}
db.config = assertConfig
process.env.CLUSTER_APP_NAME = assertAppName
const assertError = new Error('Shousou')
fakeProviderConfig.rejects(assertError)
fakeUtil.getAppNames.returns([assertAppName])
await core.init()
assert.notOk(fakeProviderConfig.called)
})
t.test('should only create an application with the environment app name', async function() {
const assertAppName = 'Yasashii Ketsumatsu'
const assertAppSecondaryName = 'Big Time'
const assertTestString = 'Serozore no Omoi'
const assertConfig = {
[assertAppName]: {
provider: assertProviderName,
teststring: assertTestString,
},
[assertAppSecondaryName]: {
provider: assertProviderName,
teststring: assertTestString,
},
}
db.config = assertConfig
process.env.CLUSTER_APP_NAME = assertAppName
fakeUtil.getAppNames.returns([assertAppSecondaryName, assertAppName])
assert.strictEqual(core.applications.length, 0)
await core.init()
assert.strictEqual(core.applications.length, 1)
assert.ok(core.getApplication(assertAppName))
assert.notOk(core.getApplication(assertAppSecondaryName))
let application = core.getApplication(assertAppName)
assert.strictEqual(core.applications[0], application)
assert.strictEqual(application.name, assertAppName)
assert.strictEqual(application.ctx.db, core.db)
assert.strictEqual(application.ctx.util, core.util)
assert.notStrictEqual(application.ctx.log, core.log)
assert.strictEqual(application.ctx.core, core)
assert.strictEqual(application.config.teststring, assertTestString)
assert.ok(application.fresh)
assert.ok(application.provider instanceof FakeProvider)
})
})
t.describe('#run()', function() {
let core
let testAppOneName
let testAppTwoName
let stubRunApplication
t.beforeEach(function() {
testAppOneName = 'Tenshi'
testAppTwoName = 'no CLOVER'
db.data.core = {
[testAppOneName]: {
versions: []
},
[testAppTwoName]: {
versions: []
},
}
core = new Core(db, util, log)
core.runApplication = stubRunApplication = stub().resolves()
core.isSlave = true
db.write = stub().resolves()
log.info.reset()
log.warn.reset()
log.error.reset()
for (let name of [testAppOneName, testAppTwoName]) {
let onOrOnce = stub()
let app = {
name: name,
fresh: false,
on: onOrOnce,
once: onOrOnce,
ctx: {
log: {
info: stub(),
warn: stub(),
error: stub(),
fatal: stub(),
},
},
runVersion: stub().resolves(),
update: stub().resolves(),
startAutoupdater: stub(),
}
core.applicationMap.set(name, app)
core.applications.push(app)
}
process.env.CLUSTER_APP_NAME = testAppTwoName
})
t.test('should not call update', async function() {
const assertFirstError = new Error('Manatsu')
const assertSecondError = new Error('no Photograph')
core.applicationMap.get(testAppOneName).update.rejects(assertFirstError)
core.applicationMap.get(testAppTwoName).update.rejects(assertSecondError)
await core.run()
assert.notOk(core.applications[0].update.called)
assert.notOk(core.applications[1].update.called)
})
t.test('app.on updated should not be hooked', async function() {
assert.notOk(core.applicationMap.get(testAppOneName).once.called)
assert.notOk(core.applicationMap.get(testAppTwoName).once.called)
assert.notOk(core.applicationMap.get(testAppOneName).on.called)
assert.notOk(core.applicationMap.get(testAppTwoName).on.called)
core.runApplication.returnWith(function(app) {
assert.notOk(app.once.called)
assert.notOk(app.on.called)
return Promise.resolve()
})
await core.run()
assert.notOk(core.applicationMap.get(testAppOneName).once.called)
assert.notOk(core.applicationMap.get(testAppTwoName).once.called)
assert.notOk(core.applicationMap.get(testAppOneName).on.called)
assert.notOk(core.applicationMap.get(testAppTwoName).on.called)
})
t.test('should not call startAutoupdater', async function() {
stubRunApplication.rejects(new Error('not seen'))
assert.notOk(core.applications[0].startAutoupdater.called)
assert.notOk(core.applications[1].startAutoupdater.called)
await core.run()
assert.notOk(core.applications[0].startAutoupdater.called)
assert.notOk(core.applications[1].startAutoupdater.called)
})
t.test('should be safe to call multiple times', async function() {
await core.run()
assert.strictEqual(core.applicationMap.get(testAppOneName).runVersion.callCount, 0)
assert.strictEqual(core.applicationMap.get(testAppTwoName).runVersion.callCount, 1)
await core.run()
await core.run()
await core.run()
await core.run()
assert.strictEqual(core.applicationMap.get(testAppOneName).runVersion.callCount, 0)
assert.strictEqual(core.applicationMap.get(testAppTwoName).runVersion.callCount, 1)
})
t.test('should call runVersion on selected application', async function() {
const assertFirstError = new Error('Manatsu')
const assertSecondError = new Error('no Photograph')
const assertVersion = 'v11.22.33'
core.applicationMap.get(testAppOneName).runVersion.rejects(assertFirstError)
core.applicationMap.get(testAppTwoName).runVersion.rejects(assertSecondError)
process.env.CLUSTER_APP_NAME = testAppTwoName
process.env.CLUSTER_APP_VERSION = assertVersion
let err = await assert.isRejected(core.run())
let app = core.applicationMap.get(testAppTwoName)
assert.strictEqual(core.applicationMap.get(testAppOneName).runVersion.callCount, 0)
assert.strictEqual(app.runVersion.callCount, 1)
assert.strictEqual(app.runVersion.firstCall[0], assertVersion)
assert.strictEqual(app.ctx.log.fatal.callCount, 1)
assert.strictEqual(app.ctx.log.fatal.firstCall[0], assertSecondError)
assert.strictEqual(err, assertSecondError)
})
})

View file

@ -1,6 +1,6 @@
import { Eltro as t, assert} from 'eltro' import { Eltro as t, assert} from 'eltro'
import fs from 'fs/promises' import fs from 'fs/promises'
import http from 'http' import HttpServer from '../core/http.mjs'
import Util from '../core/util.mjs' import Util from '../core/util.mjs'
import { request } from '../core/client.mjs' import { request } from '../core/client.mjs'
import { setTimeout } from 'timers/promises' import { setTimeout } from 'timers/promises'
@ -10,438 +10,520 @@ import getLog from '../core/log.mjs'
const util = new Util(import.meta.url) const util = new Util(import.meta.url)
const port = 61412 const port = 61412
const turnDebuggingOn = false
t.timeout(10000).describe('', function() { const runners = [
let server = null ['runner.mjs', 'testapp'],
let prefix = `http://localhost:${port}/` ['runner_cluster.mjs', 'testappcluster'],
let files = [] ]
let logs = []
let versions = []
let processor
let integrationLog = getLog('test.integration', [])
let compressorPath = util.getPathFromRoot('./7za.exe')
if (process.platform !== 'win32') {
compressorPath = util.getPathFromRoot('./7zas')
}
t.before(function(cb) { runners.forEach(function([runnerName, appname]) {
server = http.createServer(function(req, res) { t.timeout(10000).describe(runnerName, function() {
req.on('error', function(err) { let wasSuccessful = false
integrationLog.error(err, 'error') let http = null
}) let server = null
res.on('error', function(err) { let prefix = `http://localhost:${port}/`
integrationLog.error(err, 'error') let files = [util.getPathFromRoot('./testappcluster')]
}) let logs = []
let allLogs = []
let versions = []
let processor
let integrationLog = getLog('test.integration', [])
let compressorPath = util.getPathFromRoot('./7za.exe')
if (process.platform !== 'win32') {
compressorPath = util.getPathFromRoot('./7zas')
}
integrationLog.info('[SERVER] got request ' + req.url) t.before(function() {
http = new HttpServer()
if (req.url === '/releases') { server = http.createServer(function(req, res) {
res.statusCode = 200 req.on('error', function(err) {
let output = versions.map(x => { integrationLog.error(err, 'error')
return {
name: x[0],
body: x[1],
assets: [{
name: x[2],
browser_download_url: prefix + 'files/' + x[2]
}]
}
}) })
res.end(JSON.stringify(output)); res.on('error', function(err) {
return integrationLog.error(err, 'error')
} else if (req.url.startsWith('/files')) { })
let filename = req.url.substring(req.url.lastIndexOf('/') + 1)
return fs.open(util.getPathFromRoot('./' + filename)) integrationLog.info('[SERVER] got request ' + req.url)
.then(function(file) {
pipeline(file.createReadStream(), res, function(err) { if (req.url === '/releases') {
if (err) { res.statusCode = 200
console.log(err) let output = versions.map(x => {
res.statusCode = 404 return {
res.end(JSON.stringify({ error: 'unknown url' })) name: x[0],
body: x[1],
assets: [{
name: x[2],
browser_download_url: prefix + 'files/' + x[2]
}]
} }
}) })
}).catch(function(err) { res.end(JSON.stringify(output));
console.log(err) return
res.statusCode = 404 } else if (req.url.startsWith('/files')) {
res.end(JSON.stringify({ error: 'unknown url' })) let filename = req.url.substring(req.url.lastIndexOf('/') + 1)
return fs.open(util.getPathFromRoot('./' + filename))
.then(function(file) {
pipeline(file.createReadStream(), res, function(err) {
if (err) {
console.log(err)
res.statusCode = 404
res.end(JSON.stringify({ error: 'unknown url' }))
}
})
}).catch(function(err) {
console.log(err)
res.statusCode = 404
res.end(JSON.stringify({ error: 'unknown url' }))
})
}
res.statusCode = 404
res.end(JSON.stringify({ error: 'unknown url' }))
})
return fs.rm(util.getPathFromRoot('./db.json'), { force: true })
.then(function() {
return server.listenAsync(port)
})
})
t.after(function() {
if (!turnDebuggingOn && !wasSuccessful) {
for (let i = 0; i < allLogs.length; i++) {
prettyPrintMessage(allLogs[i])
}
}
return Promise.all(files.map(function(file) {
return fs.rm(file, { force: true, recursive: true })
}))
.then(function() {
if (processor && !processor.exitCode) {
processor.kill()
return waitUntilClosed()
}
}).then(function() {
return http.closeServer()
})
})
const version_1_stable = `
export function start(http, port, ctx) {
const server = http.createServer(function (req, res) {
res.writeHead(200);
res.end(JSON.stringify({ version: 'v1' }))
})
return server.listenAsync(port, '0.0.0.0')
.then(() => {
ctx.log.info({ port: port, listening: true }, \`Server is listening on \${port} serving v1\`)
}) })
} }
`
res.statusCode = 404 const version_2_nolisten = `
res.end(JSON.stringify({ error: 'unknown url' })) export function start(http, port, ctx) {
}) }
`
fs.rm(util.getPathFromRoot('./db.json'), { force: true }).then(function() { const version_3_crashing = `
server.listen(port, cb) export function start(http, port, ctx) {
}, cb) process.exit(1)
}) }
`
t.after(function() { const version_4_stable = `
return Promise.all(files.map(function(file) { export function start(http, port, ctx) {
return fs.rm(file, { force: true, recursive: true }) const server = http.createServer(function (req, res) {
})) res.writeHead(200);
.then(function() { res.end(JSON.stringify({ version: 'v4' }))
if (processor && !processor.exitCode) {
processor.kill()
}
})
})
const version_1_stable = `
export function start(http, port, ctx) {
const server = http.createServer(function (req, res) {
res.writeHead(200);
res.end(JSON.stringify({ version: 'v1' }))
})
return server.listenAsync(port, '0.0.0.0')
.then(() => {
ctx.log.info({ port: port, listening: true }, \`Server is listening on \${port} serving v1\`)
}) })
}
`
const version_2_nolisten = ` return server.listenAsync(port, '0.0.0.0')
export function start(http, port, ctx) { .then(() => {
} ctx.log.info({ port: port, listening: true }, \`Server is listening on \${port} serving v4\`)
` })
const version_3_crashing = `
export function start(http, port, ctx) {
process.exit(1)
}
`
const version_4_stable = `
export function start(http, port, ctx) {
const server = http.createServer(function (req, res) {
res.writeHead(200);
res.end(JSON.stringify({ version: 'v4' }))
})
return server.listenAsync(port, '0.0.0.0')
.then(() => {
ctx.log.info({ port: port, listening: true }, \`Server is listening on \${port} serving v4\`)
})
}
`
function file(relative) {
let file = util.getPathFromRoot(relative)
files.push(file)
return file
}
function log(message) {
let lines = message.split('\n')
for (let line of lines) {
if (!line.trim()) continue
logs.push(line)
} }
} `
function parseLine(line) { function file(relative) {
if (line[0] === '{') { let file = util.getPathFromRoot(relative)
return JSON.parse(line) files.push(file)
return file
} }
return {
msg: line
}
}
function log(message) {
let logIndex = 0 let lines = message.split('\n')
function catchupLog(ms = 0) { for (let line of lines) {
if (logs.length > logIndex) { if (!line.trim()) continue
for (; logIndex < logs.length; logIndex++) { logs.push(line)
prettyPrintMessage(logs[logIndex]) allLogs.push(line)
} }
} }
if (ms > 0) {
return setTimeout(ms) function parseLine(line) {
if (line[0] === '{') {
return JSON.parse(line)
}
return {
msg: line
}
} }
}
integrationLog.on('newlog', function(record) {
prettyPrintMessage(JSON.stringify(record))
})
let logWaitIndex = 0 let logIndex = 0
function hasLogLine(regMatch) { function catchupLog(ms = 0) {
if (logs.length > logWaitIndex) { if (logs.length > logIndex) {
for (; logWaitIndex < logs.length; logWaitIndex++) { for (; logIndex < logs.length; logIndex++) {
if (turnDebuggingOn) {
prettyPrintMessage(logs[logIndex])
}
}
}
if (ms > 0) {
return setTimeout(ms)
}
}
integrationLog.on('newlog', function(record) {
if (turnDebuggingOn) {
prettyPrintMessage(JSON.stringify(record))
}
})
let logWaitIndex = 0
function hasLogLine(regMatch) {
if (logs.length > logWaitIndex) {
for (; logWaitIndex < logs.length; logWaitIndex++) {
if (typeof(regMatch) === 'function') {
let res = regMatch(parseLine(logs[logWaitIndex]))
if (res) return true
}
else if (logs[logWaitIndex].match(regMatch)) {
return true
}
}
}
return false
}
function findInLogs(regMatch) {
for (let i = 0; i < logs.length; i++) {
if (typeof(regMatch) === 'function') { if (typeof(regMatch) === 'function') {
let res = regMatch(parseLine(logs[logWaitIndex])) let res = regMatch(parseLine(logs[i]))
if (res) return true if (res) return true
} }
else if (logs[logWaitIndex].match(regMatch)) { else if (logs[i].match(regMatch)) {
return true return true
} }
} }
} }
return false
}
function findInLogs(regMatch) { async function waitUntilListening() {
for (let i = 0; i < logs.length; i++) { let listeningLine = null
if (typeof(regMatch) === 'function') { while (processor.exitCode == null
let res = regMatch(parseLine(logs[i])) && !hasLogLine((rec) => { listeningLine = rec; return rec.listening && rec.port })) {
if (res) return true await catchupLog(10)
} }
else if (logs[i].match(regMatch)) {
return true
}
}
}
async function waitUntilListening() {
let listeningLine = null
while (processor.exitCode == null
&& !hasLogLine((rec) => { listeningLine = rec; return rec.listening && rec.port })) {
await catchupLog(10)
}
catchupLog()
if (listeningLine.listening && listeningLine.port) {
return listeningLine
} else {
return null
}
}
async function waitUntilClosed(listening) {
while (true) {
catchupLog() catchupLog()
try { if (listeningLine.listening && listeningLine.port) {
await request({}, `http://localhost:${listening.port}/`) return listeningLine
} catch (err) { } else {
break return null
} }
await setTimeout(25)
}
catchupLog()
logs.splice(0, logs.length); logIndex = 0; logWaitIndex = 0; console.log('\n-------\n')
}
function startRunner() {
return util.runCommandBackground('node', ['runner.mjs'], util.getPathFromRoot('./'), log)
}
t.test('should be fully operational', async function() {
console.log()
let index = file('./index.mjs')
await fs.writeFile(index, version_1_stable)
await util.runCommand(compressorPath, ['a', file('./v1-sc.7z'), index], util.getPathFromRoot('./testapp'))
processor = startRunner()
while (processor.exitCode == null) {
await catchupLog(10)
} }
catchupLog() async function sendRequestToApplication(listening) {
let lastErr = null
let secondLast = parseLine(logs[logs.length - 2]) for (let i = 0; i < 3; i++) {
let last = parseLine(logs[logs.length - 1]) try {
assert.match(secondLast.msg, /creating/i) let checkListening = await request({}, `http://localhost:${listening.port}/`)
assert.match(secondLast.msg, /application/i) return checkListening
assert.match(secondLast.msg, /testapp/i) } catch (err) {
assert.match(secondLast.msg, /0 releases/i) lastErr = err
assert.match(last.err.message, /none/i) await setTimeout(150)
assert.match(last.err.message, /successful/i) }
}
// Reset our log log('-- core.test.integration.mjs crash here --')
logs.splice(0, logs.length); logIndex = 0; logWaitIndex = 0; console.log('\n-------\n') log(lastErr.toString())
throw lastErr
const assertNameVersion1 = 'v1_ok'
file(`./testapp/${assertNameVersion1}`)
versions.splice(0, 0, [assertNameVersion1, 'ok version', 'v1-sc.7z'])
processor = startRunner()
let listening = await waitUntilListening()
let checkListening = await request({}, `http://localhost:${listening.port}/`)
assert.strictEqual(checkListening.body.version, 'v1')
while (!hasLogLine(/core is running/)) {
await catchupLog(10)
} }
catchupLog() async function waitUntilClosed(listening) {
while (true) {
catchupLog()
try {
await request({}, `http://localhost:${listening.port}/`)
} catch (err) {
break
}
await setTimeout(25)
}
catchupLog()
let db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json'))) logs.splice(0, logs.length); logIndex = 0; logWaitIndex = 0;
assert.strictEqual(db.core.testapp.active, assertNameVersion1) if (turnDebuggingOn) { console.log('\n-------\n') }
assert.strictEqual(db.core.testapp.versions.length, 1)
assert.strictEqual(db.core.testapp.versions[0].stable, 1)
assert.strictEqual(db.core.testapp.versions[0].installed, true)
// Create our second version
await fs.writeFile(index, version_2_nolisten)
await util.runCommand(compressorPath, ['a', file('./v2-sc.7z'), index], util.getPathFromRoot('./testapp'))
const assertNameVersion2 = 'v2_nolisten'
file(`./testapp/${assertNameVersion2}`)
versions.splice(0, 0, [assertNameVersion2, 'no listen version', 'v2-sc.7z'])
// wait a second for it to trigger an update
await setTimeout(500)
while (!hasLogLine(/Error starting v2_nolisten/)) {
await catchupLog(10)
}
while (!hasLogLine(/restart request.*v2_nolisten.*dirty/)) {
await catchupLog(10)
}
while (processor.exitCode == null) {
await catchupLog(10)
}
catchupLog()
db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json')))
assert.strictEqual(db.core.testapp.active, assertNameVersion2)
assert.strictEqual(db.core.testapp.versions.length, 2)
assert.strictEqual(db.core.testapp.versions[0].stable, -1)
assert.strictEqual(db.core.testapp.versions[0].installed, true)
assert.strictEqual(db.core.testapp.versions[1].stable, 1)
assert.strictEqual(db.core.testapp.versions[1].installed, true)
// Since application was in dirty state, on next attempt should attempt to
// run v2 again and then falling back to v1
await waitUntilClosed()
processor = startRunner()
await catchupLog(10)
while (!hasLogLine(/Attempting to run version v2_nolisten/)) {
await catchupLog(10)
}
while (!hasLogLine(/Attempting to run version v1_ok/)) {
await catchupLog(10)
} }
listening = await waitUntilListening() function startRunner() {
assert.ok(listening) return util.runCommandBackground('node', [runnerName], util.getPathFromRoot('./'), log)
checkListening = await request({}, `http://localhost:${listening.port}/`) }
assert.strictEqual(checkListening.body.version, 'v1')
t.test('should be fully operational', async function() {
console.log()
if (!turnDebuggingOn) { console.log('Running empty test') }
let index = file('./index.mjs')
await fs.writeFile(index, version_1_stable)
await util.runCommand(compressorPath, ['a', file('./v1-sc.7z'), index], util.getPathFromRoot('./testapp'))
processor = startRunner()
while (processor.exitCode == null) {
await catchupLog(10)
}
catchupLog()
let secondLast = parseLine(logs[logs.length - 2])
let last = parseLine(logs[logs.length - 1])
assert.match(secondLast.msg, /creating/i)
assert.match(secondLast.msg, /application/i)
assert.match(secondLast.msg, /testapp/i)
assert.match(secondLast.msg, /0 releases/i)
assert.match(last.err.message, /none/i)
assert.match(last.err.message, /successful/i)
// Reset our log
logs.splice(0, logs.length); logIndex = 0; logWaitIndex = 0;
if (turnDebuggingOn) { console.log('\n-------\n') }
const assertNameVersion1 = 'v1_ok'
if (!turnDebuggingOn) { console.log(`Running update ${assertNameVersion1} test`) }
file(`./testapp/${assertNameVersion1}`)
versions.splice(0, 0, [assertNameVersion1, 'ok version', 'v1-sc.7z'])
processor = startRunner()
let listening = await waitUntilListening()
let checkListening = await sendRequestToApplication(listening)
assert.strictEqual(checkListening.body.version, 'v1')
while (!hasLogLine(/core is running/)) {
await catchupLog(10)
if (processor.exitCode !== null) {
throw new Error('Process exited with ' + processor.exitCode)
}
}
catchupLog()
let db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json')))
assert.strictEqual(db.core[appname].active, assertNameVersion1)
assert.strictEqual(db.core[appname].versions.length, 1)
assert.strictEqual(db.core[appname].versions[0].stable, 1)
assert.strictEqual(db.core[appname].versions[0].installed, true)
// Create our second version
await fs.writeFile(index, version_2_nolisten)
await util.runCommand(compressorPath, ['a', file('./v2-sc.7z'), index], util.getPathFromRoot('./testapp'))
const assertNameVersion2 = 'v2_nolisten'
if (!turnDebuggingOn) { console.log(`Running update ${assertNameVersion2} test`) }
file(`./testapp/${assertNameVersion2}`)
versions.splice(0, 0, [assertNameVersion2, 'no listen version', 'v2-sc.7z'])
// wait a second for it to trigger an update
await setTimeout(500)
while (!hasLogLine(/Error starting v2_nolisten/)) {
await catchupLog(10)
}
if (appname !== 'testappcluster') {
while (!hasLogLine(/restart request.*v2_nolisten.*dirty/)) {
await catchupLog(10)
}
while (processor.exitCode == null) {
await catchupLog(10)
}
catchupLog()
db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json')))
assert.strictEqual(db.core[appname].active, assertNameVersion2)
assert.strictEqual(db.core[appname].versions.length, 2)
assert.strictEqual(db.core[appname].versions[0].stable, -1)
assert.strictEqual(db.core[appname].versions[0].installed, true)
assert.strictEqual(db.core[appname].versions[1].stable, 1)
assert.strictEqual(db.core[appname].versions[1].installed, true)
// Since application was in dirty state, on next attempt should attempt to
// run v2 again and then falling back to v1
await waitUntilClosed()
if (!turnDebuggingOn) { console.log(`Running fresh ${assertNameVersion2} test`) }
processor = startRunner()
await catchupLog(10)
while (!hasLogLine(/Attempting to run version v2_nolisten/)) {
await catchupLog(10)
}
}
while (!hasLogLine(/Attempting to run version v1_ok/)) {
await catchupLog(10)
}
listening = await waitUntilListening()
assert.ok(listening)
checkListening = await sendRequestToApplication(listening)
assert.strictEqual(checkListening.body.version, 'v1')
while (!hasLogLine(/is up and running/)) {
await setTimeout(10)
}
db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json')))
assert.strictEqual(db.core[appname].active, assertNameVersion1)
assert.strictEqual(db.core[appname].versions.length, 2)
assert.strictEqual(db.core[appname].versions[0].stable, -2)
assert.strictEqual(db.core[appname].versions[1].stable, 1)
assert.ok(findInLogs(/Attempting to run version v2_nolisten/))
assert.ok(findInLogs(/Error starting v2_nolisten/))
processor.kill()
if (!turnDebuggingOn) { console.log(`Running version stability check test`) }
await waitUntilClosed()
processor = startRunner()
listening = await waitUntilListening()
assert.ok(listening)
checkListening = await sendRequestToApplication(listening)
assert.strictEqual(checkListening.body.version, 'v1')
while (!hasLogLine(/is up and running/)) {
await setTimeout(10)
}
db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json')))
assert.strictEqual(db.core[appname].active, assertNameVersion1)
assert.strictEqual(db.core[appname].versions.length, 2)
assert.strictEqual(db.core[appname].versions[0].stable, -2)
assert.strictEqual(db.core[appname].versions[1].stable, 1)
assert.notOk(findInLogs(/Attempting to run version v2_nolisten/))
assert.notOk(findInLogs(/Error starting v2_nolisten/))
// Create our third version
await fs.writeFile(index, version_3_crashing)
await util.runCommand(compressorPath, ['a', file('./v3-sc.7z'), index], util.getPathFromRoot('./testapp'))
const assertNameVersion3 = 'v3_crash'
if (!turnDebuggingOn) { console.log(`Running update ${assertNameVersion3} test`) }
file(`./testapp/${assertNameVersion3}`)
versions.splice(0, 0, [assertNameVersion3, 'crash version', 'v3-sc.7z'])
// wait a second for it to trigger an update
await setTimeout(500)
while (!hasLogLine(/Attempting to run version v3_crash/)) {
await catchupLog(10)
}
if (appname !== 'testappcluster') {
while (processor.exitCode == null) {
await catchupLog(10)
}
db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json')))
assert.strictEqual(db.core[appname].active, assertNameVersion3)
assert.strictEqual(db.core[appname].versions.length, 3)
assert.strictEqual(db.core[appname].versions[0].stable, -2)
assert.strictEqual(db.core[appname].versions[1].stable, -2)
assert.strictEqual(db.core[appname].versions[2].stable, 1)
catchupLog()
// Should recover afterwards
await waitUntilClosed()
processor = startRunner()
listening = await waitUntilListening()
assert.ok(listening)
checkListening = await sendRequestToApplication(listening)
assert.strictEqual(checkListening.body.version, 'v1')
while (!hasLogLine(/core is running/)) {
await setTimeout(10)
}
} else {
while (!hasLogLine(/Attempting to run version v1_ok/)) {
await catchupLog(10)
}
while (!hasLogLine(/is up and running/)) {
await setTimeout(10)
}
db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json')))
assert.strictEqual(db.core[appname].active, assertNameVersion1)
assert.strictEqual(db.core[appname].versions.length, 3)
assert.strictEqual(db.core[appname].versions[0].stable, -2)
assert.strictEqual(db.core[appname].versions[1].stable, -2)
assert.strictEqual(db.core[appname].versions[2].stable, 1)
}
// Create our fourth version
await fs.writeFile(index, version_4_stable)
await util.runCommand(compressorPath, ['a', file('./v4-sc.7z'), index], util.getPathFromRoot('./testapp'))
const assertNameVersion4 = 'v4_stable'
if (!turnDebuggingOn) { console.log(`Running update ${assertNameVersion4} test`) }
file(`./testapp/${assertNameVersion4}`)
versions.splice(0, 0, [assertNameVersion4, 'no listen version', 'v4-sc.7z'])
// wait a second for it to trigger an update
await setTimeout(500)
while (!hasLogLine(/Attempting to run version v4_stable/)) {
await catchupLog(10)
}
while (!hasLogLine(/Server is listening on 31313 serving v4/)) {
await catchupLog(10)
}
catchupLog()
checkListening = await sendRequestToApplication(listening)
assert.strictEqual(checkListening.body.version, 'v4')
while (!hasLogLine(/core is running/)) {
await setTimeout(10) await setTimeout(10)
}
db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json'))) db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json')))
assert.strictEqual(db.core.testapp.active, assertNameVersion1) assert.strictEqual(db.core[appname].active, assertNameVersion4)
assert.strictEqual(db.core.testapp.versions.length, 2) assert.strictEqual(db.core[appname].versions.length, 4)
assert.strictEqual(db.core.testapp.versions[0].stable, -2) assert.strictEqual(db.core[appname].versions[0].stable, 1)
assert.strictEqual(db.core.testapp.versions[1].stable, 1) assert.strictEqual(db.core[appname].versions[1].stable, -2)
wasSuccessful = true
assert.ok(findInLogs(/Attempting to run version v2_nolisten/)) })
assert.ok(findInLogs(/Error starting v2_nolisten/))
processor.kill()
await waitUntilClosed()
processor = startRunner()
listening = await waitUntilListening()
assert.ok(listening)
checkListening = await request({}, `http://localhost:${listening.port}/`)
assert.strictEqual(checkListening.body.version, 'v1')
while (!hasLogLine(/core is running/)) {
await setTimeout(10)
}
db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json')))
assert.strictEqual(db.core.testapp.active, assertNameVersion1)
assert.strictEqual(db.core.testapp.versions.length, 2)
assert.strictEqual(db.core.testapp.versions[0].stable, -2)
assert.strictEqual(db.core.testapp.versions[1].stable, 1)
assert.notOk(findInLogs(/Attempting to run version v2_nolisten/))
assert.notOk(findInLogs(/Error starting v2_nolisten/))
// Create our third version
await fs.writeFile(index, version_3_crashing)
await util.runCommand(compressorPath, ['a', file('./v3-sc.7z'), index], util.getPathFromRoot('./testapp'))
const assertNameVersion3 = 'v3_crash'
file(`./testapp/${assertNameVersion3}`)
versions.splice(0, 0, [assertNameVersion3, 'crash version', 'v3-sc.7z'])
// wait a second for it to trigger an update
await setTimeout(500)
while (!hasLogLine(/Attempting to run version v3_crash/)) {
await catchupLog(10)
}
while (processor.exitCode == null) {
await catchupLog(10)
}
db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json')))
assert.strictEqual(db.core.testapp.active, assertNameVersion3)
assert.strictEqual(db.core.testapp.versions.length, 3)
assert.strictEqual(db.core.testapp.versions[0].stable, -2)
assert.strictEqual(db.core.testapp.versions[1].stable, -2)
assert.strictEqual(db.core.testapp.versions[2].stable, 1)
catchupLog()
// Should recover afterwards
await waitUntilClosed()
processor = startRunner()
listening = await waitUntilListening()
assert.ok(listening)
checkListening = await request({}, `http://localhost:${listening.port}/`)
assert.strictEqual(checkListening.body.version, 'v1')
while (!hasLogLine(/core is running/)) {
await setTimeout(10)
}
// Create our fourth version
await fs.writeFile(index, version_4_stable)
await util.runCommand(compressorPath, ['a', file('./v4-sc.7z'), index], util.getPathFromRoot('./testapp'))
const assertNameVersion4 = 'v4_stable'
file(`./testapp/${assertNameVersion4}`)
versions.splice(0, 0, [assertNameVersion4, 'no listen version', 'v4-sc.7z'])
// wait a second for it to trigger an update
await setTimeout(500)
while (!hasLogLine(/Attempting to run version v4_stable/)) {
await catchupLog(10)
}
while (!hasLogLine(/Server is listening on 31313 serving v4/)) {
await catchupLog(10)
}
catchupLog()
checkListening = await request({}, `http://localhost:${listening.port}/`)
assert.strictEqual(checkListening.body.version, 'v4')
await setTimeout(10)
db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json')))
assert.strictEqual(db.core.testapp.active, assertNameVersion4)
assert.strictEqual(db.core.testapp.versions.length, 4)
assert.strictEqual(db.core.testapp.versions[0].stable, 1)
assert.strictEqual(db.core.testapp.versions[1].stable, -2)
}) })
}) })

View file

@ -239,6 +239,7 @@ t.describe('#constructor()', function() {
assert.strictEqual(core.restart, assertRestarter) assert.strictEqual(core.restart, assertRestarter)
assert.deepStrictEqual(core.applications, []) assert.deepStrictEqual(core.applications, [])
assert.ok(core.applicationMap) assert.ok(core.applicationMap)
assert.strictEqual(core.isSlave, false)
}) })
}) })
@ -966,7 +967,6 @@ t.describe('#runApplication()', function() {
assert.match(err.message, /found/) assert.match(err.message, /found/)
}) })
t.test('should throw if no version are found', async function() { t.test('should throw if no version are found', async function() {
const assertError = new Error('Daikichi to Rin') const assertError = new Error('Daikichi to Rin')
testApp.runVersion.rejects(assertError) testApp.runVersion.rejects(assertError)
@ -1016,7 +1016,6 @@ t.describe('#runApplication()', function() {
assert.ok(stubWrite.called) assert.ok(stubWrite.called)
}) })
t.test('should succeed if running a minus one on fresh', async function() { t.test('should succeed if running a minus one on fresh', async function() {
const assertError = new Error('Daikichi to Rin') const assertError = new Error('Daikichi to Rin')
let count = 0 let count = 0

View file

@ -77,7 +77,7 @@ export function prettyPrintMessage(line) {
if (line[0] === '{') { if (line[0] === '{') {
try { try {
let rec = JSON.parse(line) let rec = JSON.parse(line)
console.log(`[${rec.time.substr(11).replace('Z', '')}] ${style(levels[rec.level], levelcolor[rec.level])}: ${rec.name}: ${style(rec.msg, 'cyan')}`) console.log(`[${rec.time.substr(11).replace('Z', '')}] ${style(levels[rec.level], levelcolor[rec.level])}: ${rec.name} ${style('(pid: ' + rec.pid + ')', 'grey')}: ${style(rec.msg, 'cyan')}`)
if (rec.err && rec.err.message && rec.err.stack) { if (rec.err && rec.err.message && rec.err.stack) {
let err = new Error(rec.err.message) let err = new Error(rec.err.message)
err.stack = rec.err.stack err.stack = rec.err.stack

36
test/runner_cluster.mjs Normal file
View file

@ -0,0 +1,36 @@
import fs from 'fs'
import cluster from 'cluster'
import { runner } from '../core/runner.mjs'
if (cluster.isPrimary) {
try {
fs.rmSync(util.getPathFromRoot('./db.json'))
} catch {}
}
runner(import.meta.url, {
name: 'test-runner-cluster',
testappcluster: {
cluster: 2,
port: 31313,
provider: 'git',
url: 'http://localhost:61412/releases',
updateEvery: 0.014,
heartbeatTimeout: 100,
heartbeatAttempts: 3,
heartbeatAttemptsWait: 100,
log: [{
stream: 'process.stdout',
level: 'info',
}],
}
}, 'db.json')
.then(
function(core) {
core.log.info('core is running')
},
function(err) {
runner.log.error(err, 'Error starting runner')
process.exit(1)
}
)

View file

@ -195,6 +195,35 @@ t.describe('#getApplications()', function() {
assert.deepStrictEqual(util.getAppNames(getBase({ heartbeatAttemptsWait: {} })), []) assert.deepStrictEqual(util.getAppNames(getBase({ heartbeatAttemptsWait: {} })), [])
}) })
t.test('should fail to find if clusterWaitOnCrash is defined but not a valid number', function() {
assert.deepStrictEqual(util.getAppNames(getBase()), ['app'])
assert.deepStrictEqual(util.getAppNames(getBase({ clusterWaitOnCrash: null })), ['app'])
assert.deepStrictEqual(util.getAppNames(getBase({ clusterWaitOnCrash: 5 })), [])
assert.deepStrictEqual(util.getAppNames(getBase({ clusterWaitOnCrash: 15 })), ['app'])
assert.deepStrictEqual(util.getAppNames(getBase({ clusterWaitOnCrash: 1000 })), ['app'])
assert.deepStrictEqual(util.getAppNames(getBase({ clusterWaitOnCrash: 'asdf' })), [])
assert.deepStrictEqual(util.getAppNames(getBase({ clusterWaitOnCrash: '1234' })), [])
assert.deepStrictEqual(util.getAppNames(getBase({ clusterWaitOnCrash: 0 })), [])
assert.deepStrictEqual(util.getAppNames(getBase({ clusterWaitOnCrash: -5 })), [])
assert.deepStrictEqual(util.getAppNames(getBase({ clusterWaitOnCrash: [] })), [])
assert.deepStrictEqual(util.getAppNames(getBase({ clusterWaitOnCrash: {} })), [])
})
t.test('should fail to find if cluster is defined but not a valid number', function() {
assert.deepStrictEqual(util.getAppNames(getBase()), ['app'])
assert.deepStrictEqual(util.getAppNames(getBase({ cluster: null })), ['app'])
assert.deepStrictEqual(util.getAppNames(getBase({ cluster: 1 })), ['app'])
assert.deepStrictEqual(util.getAppNames(getBase({ cluster: 5 })), ['app'])
assert.deepStrictEqual(util.getAppNames(getBase({ cluster: 15 })), ['app'])
assert.deepStrictEqual(util.getAppNames(getBase({ cluster: 1000 })), [])
assert.deepStrictEqual(util.getAppNames(getBase({ cluster: 'asdf' })), [])
assert.deepStrictEqual(util.getAppNames(getBase({ cluster: '1234' })), [])
assert.deepStrictEqual(util.getAppNames(getBase({ cluster: 0 })), ['app'])
assert.deepStrictEqual(util.getAppNames(getBase({ cluster: -5 })), [])
assert.deepStrictEqual(util.getAppNames(getBase({ cluster: [] })), [])
assert.deepStrictEqual(util.getAppNames(getBase({ cluster: {} })), [])
})
t.test('should fail to find if heartbeatPath is defined but not a valid string', function() { t.test('should fail to find if heartbeatPath is defined but not a valid string', function() {
assert.deepStrictEqual(util.getAppNames(getBase()), ['app']) assert.deepStrictEqual(util.getAppNames(getBase()), ['app'])
assert.deepStrictEqual(util.getAppNames(getBase({ heartbeatPath: null })), ['app']) assert.deepStrictEqual(util.getAppNames(getBase({ heartbeatPath: null })), ['app'])