From 618cfd045148c48d6012cc6ba0522704e3b0518a Mon Sep 17 00:00:00 2001 From: Jonatan Nilsson Date: Fri, 1 Apr 2022 09:59:47 +0000 Subject: [PATCH] log: Workers in cluster will now properly notify master of newlog entries. Useful for sc-manager --- core/core.mjs | 37 ++++++++- core/log.mjs | 9 +++ package.json | 2 +- test/core.test.integration.mjs | 134 +++++++++++++++++++++++---------- test/core.test.mjs | 103 +++++++++++++++++++++++++ test/helpers.mjs | 2 +- test/runner_cluster.mjs | 13 ++++ 7 files changed, 257 insertions(+), 43 deletions(-) diff --git a/core/core.mjs b/core/core.mjs index 9c6aaa7..27e190c 100644 --- a/core/core.mjs +++ b/core/core.mjs @@ -61,6 +61,8 @@ export default class Core { this.log.info(`Found applications: ${names.join(', ')}.`) + let hasCluster = false + for (let name of names) { if (this.isSlave && process.env.CLUSTER_APP_NAME !== name) { continue @@ -82,16 +84,49 @@ export default class Core { let application = new Application({ db: this.db, util: this.util, - log: getLog(logName, this.db.config[name].log || null), + log: getLog(logName, this.db.config[name].log || null, { name: name }), core: this, }, provider, name) this.applications.push(application) this.applicationMap.set(name, application) + + if (this.db.config[name].cluster) { + hasCluster = true + } } catch (err) { this.log.error(err, `Error creating application ${name} with provider ${this.db.config[name].provider}: ${err.message}`) } } + if (hasCluster && !this.isSlave) { + cluster.on('message', (worker, message) => { + // Some sanity checking + if (!message + || typeof(message) !== 'object' + || typeof(message.apptarget) !== 'string' + || typeof(message.type) !== 'string' + || typeof(message.payload) !== 'object' + || !message.payload + ) { + return + } + + let app = this.getApplication(message.apptarget) + let targetLog = null + if (app) { + targetLog = app.ctx.log + } else if (message.apptarget === this.db.config.name) { + targetLog = this.log + } + + if (!targetLog) return + + if (message.type === 'newlog') { + targetLog.emit('newlog', message.payload) + } + }) + } + if (names.length && !this.applications.length) { return Promise.reject(new Error('None of the application were successful in running')) } diff --git a/core/log.mjs b/core/log.mjs index 62dbb67..53973e3 100644 --- a/core/log.mjs +++ b/core/log.mjs @@ -1,4 +1,5 @@ // import nodewindows from 'node-windows' +import cluster from 'cluster' import bunyan from 'bunyan-lite' import { setTimeout } from 'timers/promises' @@ -54,6 +55,14 @@ export default function getLog(name, streams = null, opts = {}) { stream: { write: function(record) { logger.emit('newlog', record) + + if (cluster.isWorker) { + process.send({ + apptarget: opts.name || name, + type: 'newlog', + payload: record, + }) + } }, end: function() {}, destroy: function() {}, diff --git a/package.json b/package.json index 860062d..6a9e5db 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "service-core", - "version": "3.0.0-beta.14", + "version": "3.0.0-beta.15", "description": "Core boiler plate code to install node server as windows service", "main": "index.mjs", "scripts": { diff --git a/test/core.test.integration.mjs b/test/core.test.integration.mjs index 92fe016..d9256ec 100644 --- a/test/core.test.integration.mjs +++ b/test/core.test.integration.mjs @@ -166,7 +166,9 @@ runners.forEach(function([runnerName, appname]) { function parseLine(line) { if (line[0] === '{') { - return JSON.parse(line) + try { + return JSON.parse(line) + } catch {} } return { msg: line @@ -187,6 +189,24 @@ runners.forEach(function([runnerName, appname]) { return setTimeout(ms) } } + + async function safeTry(func) { + let lastException = null + for (let i = 0; i < 3; i++) { + if (i > 0) { + allLogs.push('[safeTry] Failed with error ' + lastException.message + ', trying agian') + await setTimeout(500) + } + try { + await func() + return + } + catch (err) { + lastException = err + } + } + throw lastException + } integrationLog.on('newlog', function(record) { allLogs.push(JSON.stringify(record)) @@ -275,6 +295,7 @@ runners.forEach(function([runnerName, appname]) { } t.test('should be fully operational', async function() { + let db; console.log() if (!turnDebuggingOn) { console.log('Running empty test') } @@ -329,11 +350,13 @@ runners.forEach(function([runnerName, appname]) { catchupLog() - let db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json'))) - assert.strictEqual(db.core[appname].active, assertNameVersion1) - assert.strictEqual(db.core[appname].versions.length, 1) - assert.strictEqual(db.core[appname].versions[0].stable, 1) - assert.strictEqual(db.core[appname].versions[0].installed, true) + await safeTry(async function() { + db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json'))) + assert.strictEqual(db.core[appname].active, assertNameVersion1) + assert.strictEqual(db.core[appname].versions.length, 1) + assert.strictEqual(db.core[appname].versions[0].stable, 1) + assert.strictEqual(db.core[appname].versions[0].installed, true) + }) // Create our second version await fs.writeFile(index, version_2_nolisten) @@ -362,13 +385,15 @@ runners.forEach(function([runnerName, appname]) { } catchupLog() - db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json'))) - assert.strictEqual(db.core[appname].active, assertNameVersion2) - assert.strictEqual(db.core[appname].versions.length, 2) - assert.strictEqual(db.core[appname].versions[0].stable, -1) - assert.strictEqual(db.core[appname].versions[0].installed, true) - assert.strictEqual(db.core[appname].versions[1].stable, 1) - assert.strictEqual(db.core[appname].versions[1].installed, true) + await safeTry(async function() { + db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json'))) + assert.strictEqual(db.core[appname].active, assertNameVersion2) + assert.strictEqual(db.core[appname].versions.length, 2) + assert.strictEqual(db.core[appname].versions[0].stable, -1) + assert.strictEqual(db.core[appname].versions[0].installed, true) + assert.strictEqual(db.core[appname].versions[1].stable, 1) + assert.strictEqual(db.core[appname].versions[1].installed, true) + }) // Since application was in dirty state, on next attempt should attempt to // run v2 again and then falling back to v1 @@ -397,11 +422,13 @@ runners.forEach(function([runnerName, appname]) { await setTimeout(10) } - db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json'))) - assert.strictEqual(db.core[appname].active, assertNameVersion1) - assert.strictEqual(db.core[appname].versions.length, 2) - assert.strictEqual(db.core[appname].versions[0].stable, -2) - assert.strictEqual(db.core[appname].versions[1].stable, 1) + await safeTry(async function() { + db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json'))) + assert.strictEqual(db.core[appname].active, assertNameVersion1) + assert.strictEqual(db.core[appname].versions.length, 2) + assert.strictEqual(db.core[appname].versions[0].stable, -2) + assert.strictEqual(db.core[appname].versions[1].stable, 1) + }) assert.ok(findInLogs(/Attempting to run version v2_nolisten/)) assert.ok(findInLogs(/Error starting v2_nolisten/)) @@ -424,11 +451,13 @@ runners.forEach(function([runnerName, appname]) { await setTimeout(10) } - db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json'))) - assert.strictEqual(db.core[appname].active, assertNameVersion1) - assert.strictEqual(db.core[appname].versions.length, 2) - assert.strictEqual(db.core[appname].versions[0].stable, -2) - assert.strictEqual(db.core[appname].versions[1].stable, 1) + await safeTry(async function() { + db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json'))) + assert.strictEqual(db.core[appname].active, assertNameVersion1) + assert.strictEqual(db.core[appname].versions.length, 2) + assert.strictEqual(db.core[appname].versions[0].stable, -2) + assert.strictEqual(db.core[appname].versions[1].stable, 1) + }) assert.notOk(findInLogs(/Attempting to run version v2_nolisten/)) assert.notOk(findInLogs(/Error starting v2_nolisten/)) @@ -455,12 +484,14 @@ runners.forEach(function([runnerName, appname]) { await catchupLog(10) } - db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json'))) - assert.strictEqual(db.core[appname].active, assertNameVersion3) - assert.strictEqual(db.core[appname].versions.length, 3) - assert.strictEqual(db.core[appname].versions[0].stable, -2) - assert.strictEqual(db.core[appname].versions[1].stable, -2) - assert.strictEqual(db.core[appname].versions[2].stable, 1) + await safeTry(async function() { + db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json'))) + assert.strictEqual(db.core[appname].active, assertNameVersion3) + assert.strictEqual(db.core[appname].versions.length, 3) + assert.strictEqual(db.core[appname].versions[0].stable, -2) + assert.strictEqual(db.core[appname].versions[1].stable, -2) + assert.strictEqual(db.core[appname].versions[2].stable, 1) + }) catchupLog() @@ -486,12 +517,14 @@ runners.forEach(function([runnerName, appname]) { await setTimeout(10) } - db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json'))) - assert.strictEqual(db.core[appname].active, assertNameVersion1) - assert.strictEqual(db.core[appname].versions.length, 3) - assert.strictEqual(db.core[appname].versions[0].stable, -2) - assert.strictEqual(db.core[appname].versions[1].stable, -2) - assert.strictEqual(db.core[appname].versions[2].stable, 1) + await safeTry(async function() { + db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json'))) + assert.strictEqual(db.core[appname].active, assertNameVersion1) + assert.strictEqual(db.core[appname].versions.length, 3) + assert.strictEqual(db.core[appname].versions[0].stable, -2) + assert.strictEqual(db.core[appname].versions[1].stable, -2) + assert.strictEqual(db.core[appname].versions[2].stable, 1) + }) } // Create our fourth version @@ -525,11 +558,32 @@ runners.forEach(function([runnerName, appname]) { await setTimeout(10) - db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json'))) - assert.strictEqual(db.core[appname].active, assertNameVersion4) - assert.strictEqual(db.core[appname].versions.length, 4) - assert.strictEqual(db.core[appname].versions[0].stable, 1) - assert.strictEqual(db.core[appname].versions[1].stable, -2) + await safeTry(async function() { + db = JSON.parse(await fs.readFile(util.getPathFromRoot('./db.json'))) + assert.strictEqual(db.core[appname].active, assertNameVersion4) + assert.strictEqual(db.core[appname].versions.length, 4) + assert.strictEqual(db.core[appname].versions[0].stable, 1) + assert.strictEqual(db.core[appname].versions[1].stable, -2) + }) + + if (appname === 'testappcluster') { + let foundCore = false + let foundWorker = false + for (let line of allLogs) { + if (line.startsWith('[FROMWORKERCORE] test-runner-cluster')) { + foundCore = true + } + else if (line.startsWith('[FROMWORKERAPP] testappcluster-1')) { + foundWorker = true + } + if (foundCore && foundWorker) { + break + } + } + + assert.ok(foundCore) + assert.ok(foundWorker) + } wasSuccessful = true }) }) diff --git a/test/core.test.mjs b/test/core.test.mjs index 0c8a1be..88a1f1f 100644 --- a/test/core.test.mjs +++ b/test/core.test.mjs @@ -1,3 +1,4 @@ +import cluster from 'cluster' import { Eltro as t, assert, stub } from 'eltro' import fs from 'fs/promises' import Core from '../core/core.mjs' @@ -439,6 +440,108 @@ t.describe('#init()', function() { assert.strictEqual(application.ctx.log.streams[1].level, 40) assert.strictEqual(application.ctx.log.streams[1].type, 'stream') }) + + t.test('should listen on cluster messages if one or more are cluster on', async function() { + const assertAppName1 = 'Dai Sagara Yoshiharu' + const assertAppName2 = 'Kuryo' + const assertCoreName = 'Ichuu' + const assertPayload1 = { a: 1 } + const assertPayload2 = { b: 2 } + const assertConfig = { + name: assertCoreName, + [assertAppName1]: { + provider: assertProviderName, + cluster: 2, + }, + [assertAppName2]: { + provider: assertProviderName, + cluster: 1, + }, + } + db.config = assertConfig + fakeUtil.getAppNames.returns([assertAppName1, assertAppName2]) + assert.strictEqual(core.applications.length, 0) + + await core.init() + + core.log.emit = stub() + + assert.strictEqual(core.applications.length, 2) + + let app1 = core.getApplication(assertAppName1) + let app2 = core.getApplication(assertAppName2) + + app1.ctx.log.emit = stub() + app2.ctx.log.emit = stub() + + cluster.emit('message', null, { + apptarget: app1.name, + type: 'newlog', + payload: assertPayload1 + }) + + assert.notOk(core.log.emit.called) + assert.ok(app1.ctx.log.emit.called) + assert.ok(app1.ctx.log.emit.firstCall[0], 'newlog') + assert.ok(app1.ctx.log.emit.firstCall[1], assertPayload1) + assert.notOk(app2.ctx.log.emit.called) + + app1.ctx.log.emit.reset() + + cluster.emit('message', null, { + apptarget: app2.name, + type: 'newlog', + payload: assertPayload2 + }) + + assert.notOk(core.log.emit.called) + assert.notOk(app1.ctx.log.emit.called) + assert.ok(app2.ctx.log.emit.called) + assert.ok(app2.ctx.log.emit.firstCall[0], 'newlog') + assert.ok(app2.ctx.log.emit.firstCall[1], assertPayload2) + + app2.ctx.log.emit.reset() + + let tests = [ + null, + undefined, + 12412, + 'asdfag', + {}, + { apptarget: 12421, type: 'newlog', payload: {}}, + { apptarget: {}, type: 'newlog', payload: {}}, + { apptarget: null, type: 'newlog', payload: {}}, + { type: 'newlog', payload: {}}, + { apptarget: app1.name, type: 12421, payload: {}}, + { apptarget: app1.name, type: {}, payload: {}}, + { apptarget: app1.name, type: null, payload: {}}, + { apptarget: app1.name, payload: {}}, + { apptarget: app1.name, type: 'newlog', payload: 12421}, + { apptarget: app1.name, type: 'newlog', payload: null}, + { apptarget: app1.name, type: 'newlog', payload: 'test'}, + ] + + tests.forEach(function(test) { + cluster.emit('message', null, test) + + assert.notOk(core.log.emit.called) + assert.notOk(app1.ctx.log.emit.called) + assert.notOk(app2.ctx.log.emit.called) + }) + + cluster.emit('message', null, { + apptarget: assertCoreName, + type: 'newlog', + payload: assertPayload1 + }) + + assert.notOk(app1.ctx.log.emit.called) + assert.notOk(app2.ctx.log.emit.called) + assert.ok(core.log.emit.called) + assert.ok(core.log.emit.called) + assert.ok(core.log.emit.firstCall[0], 'newlog') + assert.ok(core.log.emit.firstCall[1], assertPayload1) + }) }) t.describe('#run()', function() { diff --git a/test/helpers.mjs b/test/helpers.mjs index 54df4fc..97d7085 100644 --- a/test/helpers.mjs +++ b/test/helpers.mjs @@ -86,7 +86,7 @@ export function prettyPrintMessage(line) { console.log(err) } return - } catch (err){ console.log(err)} + } catch { } } console.log(line) } diff --git a/test/runner_cluster.mjs b/test/runner_cluster.mjs index c8a501c..0e46f65 100644 --- a/test/runner_cluster.mjs +++ b/test/runner_cluster.mjs @@ -27,6 +27,19 @@ runner(import.meta.url, { }, 'db.json') .then( function(core) { + if (cluster.isPrimary) { + let app = core.applications[0] + app.ctx.log.on('newlog', function(record) { + if (record.name !== app.name) { + console.log(`[FROMWORKERAPP] ${record.name} (${record.pid}) ${record.msg}`) + } + }) + core.log.on('newlog', function(record) { + if (record.pid !== process.pid) { + console.log(`[FROMWORKERCORE] ${record.name} (${record.pid}) ${record.msg}`) + } + }) + } core.log.info('core is running') }, function(err) {