From 123d493d275b4a77bc986800abaf5e5c482801d1 Mon Sep 17 00:00:00 2001 From: Dave Pacheco Date: Fri, 11 May 2012 16:22:28 -0700 Subject: [PATCH 1/4] sort output from multiple sources --- bin/bunyan | 179 ++++++++++++++++++++++++++++++++++++----------- test/cli.test.js | 19 +++++ 2 files changed, 156 insertions(+), 42 deletions(-) diff --git a/bin/bunyan b/bin/bunyan index 2ce958c..3bad2e9 100755 --- a/bin/bunyan +++ b/bin/bunyan @@ -28,7 +28,7 @@ var OM_FROM_NAME = { "json": OM_JSON, "inspect": OM_INSPECT, "simple": OM_SIMPLE -} +}; // Levels @@ -58,8 +58,6 @@ Object.keys(levelFromName).forEach(function (name) { name.length === 4 ? ' ' : '') + name.toUpperCase(); }); - - //---- support functions function getVersion() { @@ -108,7 +106,7 @@ if (!format) { } function indent(s) { - return ' ' + s.split(/\r?\n/).join('\n ') + return ' ' + s.split(/\r?\n/).join('\n '); } function objCopy(obj) { @@ -141,7 +139,7 @@ function printHelp() { util.puts(" --no-color Force no coloring (e.g. terminal doesn't support it)"); util.puts(" -o, --output MODE"); util.puts(" Specify an output mode/format. One of"); - util.puts(" paul: (the default) pretty") + util.puts(" paul: (the default) pretty"); util.puts(" json: JSON output, 2-space indent"); util.puts(" json-N: JSON output, N-space indent, e.g. 'json-4'"); util.puts(" inspect: node.js `util.inspect` output"); @@ -150,6 +148,91 @@ function printHelp() { util.puts("See for more complete docs."); } +/* + * If the user specifies multiple input sources, we want to print out records + * from all sources in a single, chronologically ordered stream. To do this + * efficiently, we first assume that all records within each source are ordered + * already, so we need only keep track of the next record in each source and + * the time of the last record emitted. To avoid excess memory usage, we + * pause() streams that are ahead of others. + * + * "streams" is an object indexed by source name (file name) which specifies: + * + * stream Actual stream object, so that we can pause and resume it. + * + * records Array of log records we've read, but not yet emitted. Each + * record includes "line" (the raw line), "rec" (the JSON + * record), and "time" (the parsed time value). + * + * done Whether the stream has any more records to emit. + */ +var streams = {}; + +function gotRecord(file, line, rec, opts, stylize) +{ + var time = new Date(rec.time); + + streams[file]['records'].push({ line: line, rec: rec, time: time }); + emitNextRecord(opts, stylize); +} + +function emitNextRecord(opts, stylize) +{ + var ofile, ready, minfile, rec; + + for (;;) { + /* + * Take a first pass through the input streams to see if we have a record + * from all of them. If not, we'll pause any streams for which we do + * already have a record (to avoid consuming excess memory) and then wait + * until we have records from the others before emitting the next record. + * + * As part of the same pass, we look for the earliest record we have not yet + * emitted. + */ + minfile = undefined; + ready = true; + for (ofile in streams) { + + if (streams[ofile].stream === null || + (!streams[ofile].done && streams[ofile].records.length === 0)) { + ready = false; + break; + } + + if (streams[ofile].records.length > 0 && (minfile === undefined || + streams[minfile].records[0].time > streams[ofile].records[0].time)) { + minfile = ofile; + } + } + + if (!ready) { + for (ofile in streams) { + if (!streams[ofile].stream || streams[ofile].done) + continue; + + if (streams[ofile].records.length > 0) { + if (!streams[ofile].paused) { + streams[ofile].paused = true; + streams[ofile].stream.pause(); + } + } else if (streams[ofile].paused) { + streams[ofile].paused = false; + streams[ofile].stream.resume(); + } + } + + return; + } + + /* + * Emit the next record for "minfile", and invoke ourselves again to make + * sure we emit as many records as we can right now. + */ + rec = streams[minfile].records.shift(); + emitRecord(rec.rec, rec.line, opts, stylize); + } +} /** * Parse the command-line options and arguments into an object. @@ -180,7 +263,7 @@ function parseArgv(argv) { if (args[i].charAt(0) === "-" && args[i].charAt(1) !== '-' && args[i].length > 2) { var splitOpts = args[i].slice(1).split(""); for (var j = 0; j < splitOpts.length; j++) { - newArgs.push('-' + splitOpts[j]) + newArgs.push('-' + splitOpts[j]); if (optTakesArg[splitOpts[j]]) { var optArg = splitOpts.slice(j+1).join(""); if (optArg.length) { @@ -195,7 +278,7 @@ function parseArgv(argv) { } args = newArgs; - endOfOptions = false; + var endOfOptions = false; while (args.length > 0) { var arg = args.shift(); switch(arg) { @@ -291,7 +374,7 @@ function stylizeWithoutColor(str, color) { /** - * Is this a valid Bunyna log record. + * Is this a valid Bunyan log record. */ function isValidRecord(rec) { if (rec.v == null || @@ -310,10 +393,11 @@ function isValidRecord(rec) { /** - * Print out a single result, considering input options. + * Parses the given log line and either emits it right away (for invalid + * records) or enqueues it for emitting later when it's the next line to show. */ -function handleLogLine(line, opts, stylize) { - // Handle non-JSON lines. +function handleLogLine(file, line, opts, stylize) { + // Emit non-JSON lines immediately. var rec; if (!line) { return emit(line + '\n'); @@ -327,6 +411,16 @@ function handleLogLine(line, opts, stylize) { } } + if (file === null || !isValidRecord(rec)) + return emitRecord(rec, line, opts, stylize); + + return gotRecord(file, line, rec, opts, stylize); +} + +/** + * Print out a single result, considering input options. + */ +function emitRecord(rec, line, opts, stylize) { switch (opts.outputMode) { case OM_PAUL: // [time] LEVEL: name[/component]/pid on hostname (src): msg* (extras...) @@ -367,8 +461,8 @@ function handleLogLine(line, opts, stylize) { 40: 'magenta', // WARN 50: 'red', // ERROR 60: 'inverse', // FATAL - } - level = stylize(level, colorFromLevel[rec.level]) + }; + level = stylize(level, colorFromLevel[rec.level]); } delete rec.level; @@ -414,7 +508,7 @@ function handleLogLine(line, opts, stylize) { details.push(indent(format("%s %s HTTP/1.1\n%s", rec.req.method, rec.req.url, Object.keys(headers).map( - function (h) { return h + ': ' + headers[h]}).join('\n')))); + function (h) { return h + ': ' + headers[h]; }).join('\n')))); } delete rec.req; @@ -431,14 +525,14 @@ function handleLogLine(line, opts, stylize) { rec.client_req.url, hostHeaderLine, Object.keys(headers).map( - function (h) { return h + ': ' + headers[h]}).join('\n')))); + function (h) { return h + ': ' + headers[h]; }).join('\n')))); } delete rec.client_req; if (rec.res) { var s = ''; if (rec.res.header) { - s += rec.res.header.trimRight() + s += rec.res.header.trimRight(); } else if (rec.res.headers) { if (rec.res.statusCode) { s += format("HTTP/1.1 %s %s\n", rec.res.statusCode, @@ -446,7 +540,7 @@ function handleLogLine(line, opts, stylize) { } var headers = rec.res.headers; s += Object.keys(headers).map( - function (h) { return h + ': ' + headers[h]}).join('\n'); + function (h) { return h + ': ' + headers[h]; }).join('\n'); } if (s) { details.push(indent(s)); @@ -455,7 +549,7 @@ function handleLogLine(line, opts, stylize) { delete rec.res; if (rec.err && rec.err.stack) { - details.push(indent(rec.err.stack)) + details.push(indent(rec.err.stack)); delete rec.err; } @@ -463,19 +557,18 @@ function handleLogLine(line, opts, stylize) { for (var i = 0; i < leftover.length; i++) { var key = leftover[i]; var value = rec[key]; - var type = typeof(value); var stringified = false; if (typeof(value) !== 'string') { value = JSON.stringify(value, null, 2); stringified = true; } if (value.indexOf('\n') !== -1 || value.length > 50) { - details.push(indent(key + ': ' + value)) + details.push(indent(key + ': ' + value)); } else if (!stringified && (value.indexOf(' ') != -1 || value.length === 0)) { - extras.push(key + '=' + JSON.stringify(value)) + extras.push(key + '=' + JSON.stringify(value)); } else { - extras.push(key + '=' + value) + extras.push(key + '=' + value); } } @@ -531,7 +624,7 @@ process.stdout.on("error", function (err) { if (err.code === "EPIPE") { // Pass. See . } else { - warn(err) + warn(err); drainStdoutAndExit(1); } }); @@ -578,17 +671,17 @@ function processStdin(opts, stylize, callback) { } if (length > 1) { - handleLogLine(leftover + lines[0], opts, stylize); + handleLogLine(null, leftover + lines[0], opts, stylize); } leftover = lines.pop(); length -= 1; for (var i=1; i < length; i++) { - handleLogLine(lines[i], opts, stylize); + handleLogLine(null, lines[i], opts, stylize); } }); stdin.on('end', function () { if (leftover) { - handleLogLine(leftover, opts, stylize); + handleLogLine(null, leftover, opts, stylize); leftover = ''; } callback(); @@ -607,9 +700,11 @@ function processStdin(opts, stylize, callback) { function processFile(file, opts, stylize, callback) { var stream = fs.createReadStream(file, {encoding: 'utf8'}); + streams[file].stream = stream; + stream.on('error', function (err) { callback(err); - }) + }); var leftover = ''; // Left-over partial line from last chunk. stream.on('data', function (chunk) { @@ -621,20 +716,21 @@ function processFile(file, opts, stylize, callback) { } if (length > 1) { - handleLogLine(leftover + lines[0], opts, stylize); + handleLogLine(file, leftover + lines[0], opts, stylize); } leftover = lines.pop(); length -= 1; for (var i=1; i < length; i++) { - handleLogLine(lines[i], opts, stylize); + handleLogLine(file, lines[i], opts, stylize); } }); stream.on('end', function () { if (leftover) { - handleLogLine(leftover, opts, stylize); + handleLogLine(file, leftover, opts, stylize); leftover = ''; } + streams[file].done = true; callback(); }); } @@ -643,13 +739,14 @@ function processFile(file, opts, stylize, callback) { /** * From node async module. */ -function asyncForEachSeries(arr, iterator, callback) { +function asyncForEach(arr, iterator, callback) { + callback = callback || function () {}; if (!arr.length) { return callback(); } var completed = 0; - var iterate = function () { - iterator(arr[completed], function (err) { + arr.forEach(function (x) { + iterator(x, function (err) { if (err) { callback(err); callback = function () {}; @@ -659,13 +756,9 @@ function asyncForEachSeries(arr, iterator, callback) { if (completed === arr.length) { callback(); } - else { - iterate(); - } } }); - }; - iterate(); + }); }; @@ -677,7 +770,7 @@ function main(argv) { try { opts = parseArgv(argv); } catch (e) { - warn("bunyan: error: %s", e.message) + warn("bunyan: error: %s", e.message); return drainStdoutAndExit(1); } if (opts.help) { @@ -693,8 +786,10 @@ function main(argv) { var retval = 0; if (opts.args.length > 0) { var files = opts.args; - var i = 0; - asyncForEachSeries(files, + files.forEach(function (file) { + streams[file] = { stream: null, records: [], done: false } + }); + asyncForEach(files, function (file, next) { processFile(file, opts, stylize, function (err) { if (err) { @@ -715,7 +810,7 @@ function main(argv) { } else { processStdin(opts, stylize, function () { process.exit(retval); - }) + }); } } diff --git a/test/cli.test.js b/test/cli.test.js index bf1fe9e..9ba871b 100644 --- a/test/cli.test.js +++ b/test/cli.test.js @@ -149,3 +149,22 @@ test('simple.log doesnotexist1.log doesnotexist2.log', function (t) { } ); }); + +test('multiple logs', function (t) { + exec(BUNYAN + ' corpus/log1.log corpus/log2.log', + function (err, stdout, stderr) { + t.error(err); + t.equal(stdout, [ + '[2012-05-08T16:57:55.586Z] INFO: agent1/73267 on headnode: message\n', + '[2012-05-08T16:58:55.586Z] INFO: agent2/73267 on headnode: message\n', + '[2012-05-08T17:01:49.339Z] INFO: agent2/73267 on headnode: message\n', + '[2012-05-08T17:02:47.404Z] INFO: agent2/73267 on headnode: message\n', + '[2012-05-08T17:02:49.339Z] INFO: agent1/73267 on headnode: message\n', + '[2012-05-08T17:02:49.404Z] INFO: agent1/73267 on headnode: message\n', + '[2012-05-08T17:02:49.404Z] INFO: agent1/73267 on headnode: message\n', + '[2012-05-08T17:02:57.404Z] INFO: agent2/73267 on headnode: message\n', + '[2012-05-08T17:08:01.105Z] INFO: agent2/76156 on headnode: message\n', + ].join('')); + t.end(); + }); +}); From f1996fccef21fa0683d7127da6a3e8b1caab02fc Mon Sep 17 00:00:00 2001 From: Dave Pacheco Date: Tue, 19 Jun 2012 14:36:02 -0700 Subject: [PATCH 2/4] add ring buffer stream --- README.md | 30 +++++++++++++++++++++++ TODO.md | 1 - examples/ringbuffer.js | 11 +++++++++ lib/bunyan.js | 54 +++++++++++++++++++++++++++++++++++++++++ test/ringbuffer.test.js | 38 +++++++++++++++++++++++++++++ 5 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 examples/ringbuffer.js create mode 100644 test/ringbuffer.test.js diff --git a/README.md b/README.md index b8083b7..14c794b 100644 --- a/README.md +++ b/README.md @@ -459,6 +459,36 @@ Supported stream types are: // Handle stream write or create error here. }); +Bunyan comes with a special stream called a RingBuffer which keeps the last N +records in memory and does *not* write the data anywhere else. One common +strategy is to log 'info' and higher to a normal log file but log all records +(including 'trace') to a ringbuffer that you can access via a debugger, or your +own HTTP interface, or a post-mortem facility like MDB or node-panic. + +To use a RingBuffer: + + /* Create a ring buffer that stores the last 100 entries. */ + var bunyan = require('bunyan'); + var ringbuffer = new bunyan.RingBuffer({ limit: 100 }); + var log = new bunyan({ + name: "foo", + stream: ringbuffer, + level: "debug" + }); + + log.info('hello world'); + console.log(ringbuffer.entries); + +This example emits: + + [ { name: 'foo', + hostname: '912d2b29', + pid: 50346, + level: 30, + msg: 'hello world', + time: '2012-06-19T21:34:19.906Z', + v: 0 } ] + # License diff --git a/TODO.md b/TODO.md index 58d598f..4292142 100644 --- a/TODO.md +++ b/TODO.md @@ -15,7 +15,6 @@ works *and* that an existing field in the parent is not *re-serialized*. - a "rolling-file" stream: but specifically by time, e.g. hourly. (MarkC requested) -- ringBuffer stream - split out `bunyan` cli to a "bunyan" or "bunyan-reader" or "node-bunyan-reader" as the basis for tools to consume bunyan logs. It can grow indep of node-bunyan for generating the logs. diff --git a/examples/ringbuffer.js b/examples/ringbuffer.js new file mode 100644 index 0000000..06d4cb3 --- /dev/null +++ b/examples/ringbuffer.js @@ -0,0 +1,11 @@ +/* Create a ring buffer that stores the last 100 entries. */ +var bunyan = require('..'); +var ringbuffer = new bunyan.RingBuffer({ limit: 100 }); +var log = new bunyan({ + name: 'foo', + stream: ringbuffer, + level: 'debug' +}); + +log.info('hello world'); +console.log(ringbuffer.entries); diff --git a/lib/bunyan.js b/lib/bunyan.js index aa294f3..52b5f99 100644 --- a/lib/bunyan.js +++ b/lib/bunyan.js @@ -954,6 +954,58 @@ var errSerializer = Logger.stdSerializers.err = function err(err) { }; +/** + * RingBuffer is a Writable Stream that just stores the last N entries in + * memory. + * + * @param options {Object}, with the following fields: + * + * - limit: number of entries to keep in memory + */ +function RingBuffer(options) { + this.limit = options && options.limit ? options.limit : 100; + this.writable = true; + this.entries = []; + EventEmitter.call(this); +} + +util.inherits(RingBuffer, EventEmitter); + +RingBuffer.prototype.write = function (str) { + var json; + + if (!this.writable) + throw (new Error('RingBuffer has been ended already')); + + try { + json = JSON.parse(str); + this.entries.push(json); + } catch (ex) { + this.entries.push(str); + } + + if (this.entries.length > this.limit) + this.entries.shift(); + + return (true); +}; + +RingBuffer.prototype.end = function () { + if (arguments.length > 0) + this.write.apply(this, Array.prototype.slice.call(arguments)); + this.writable = false; +}; + +RingBuffer.prototype.destroy = function () { + this.writable = false; + this.emit('close'); +}; + +RingBuffer.prototype.destroySoon = function () { + this.destroy(); + this.emit('close'); +}; + //---- Exports @@ -972,3 +1024,5 @@ module.exports.LOG_VERSION = LOG_VERSION; module.exports.createLogger = function createLogger(options) { return new Logger(options); }; + +module.exports.RingBuffer = RingBuffer; diff --git a/test/ringbuffer.test.js b/test/ringbuffer.test.js new file mode 100644 index 0000000..8a0329e --- /dev/null +++ b/test/ringbuffer.test.js @@ -0,0 +1,38 @@ +/* + * Test the RingBuffer output stream. + */ + +var test = require('tap').test; +var Logger = require('../lib/bunyan'); +var ringbuffer = new Logger.RingBuffer({ 'limit': 5 }); + +var log1 = new Logger({ + name: 'log1', + streams: [ + { + stream: ringbuffer, + level: 'info' + } + ] +}); + +test('ringbuffer', function (t) { + log1.info('hello'); + log1.trace('there'); + log1.error('chucklebucket'); + t.equal(ringbuffer.entries.length, 2); + t.equal(ringbuffer.entries[0]['msg'], 'hello'); + t.equal(ringbuffer.entries[1]['msg'], 'chucklebucket'); + log1.error('one'); + log1.error('two'); + log1.error('three'); + t.equal(ringbuffer.entries.length, 5); + log1.error('four'); + t.equal(ringbuffer.entries.length, 5); + t.equal(ringbuffer.entries[0]['msg'], 'chucklebucket'); + t.equal(ringbuffer.entries[1]['msg'], 'one'); + t.equal(ringbuffer.entries[2]['msg'], 'two'); + t.equal(ringbuffer.entries[3]['msg'], 'three'); + t.equal(ringbuffer.entries[4]['msg'], 'four'); + t.end(); +}); From cc6a2d0f6e65fb8f551618d551204a75429f9914 Mon Sep 17 00:00:00 2001 From: Dave Pacheco Date: Wed, 20 Jun 2012 10:39:56 -0700 Subject: [PATCH 3/4] code review comments --- lib/bunyan.js | 1 - test/ringbuffer.test.js | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/bunyan.js b/lib/bunyan.js index 52b5f99..e32c6ae 100644 --- a/lib/bunyan.js +++ b/lib/bunyan.js @@ -1003,7 +1003,6 @@ RingBuffer.prototype.destroy = function () { RingBuffer.prototype.destroySoon = function () { this.destroy(); - this.emit('close'); }; diff --git a/test/ringbuffer.test.js b/test/ringbuffer.test.js index 8a0329e..beefd3a 100644 --- a/test/ringbuffer.test.js +++ b/test/ringbuffer.test.js @@ -19,17 +19,17 @@ var log1 = new Logger({ test('ringbuffer', function (t) { log1.info('hello'); log1.trace('there'); - log1.error('chucklebucket'); + log1.error('android'); t.equal(ringbuffer.entries.length, 2); t.equal(ringbuffer.entries[0]['msg'], 'hello'); - t.equal(ringbuffer.entries[1]['msg'], 'chucklebucket'); + t.equal(ringbuffer.entries[1]['msg'], 'android'); log1.error('one'); log1.error('two'); log1.error('three'); t.equal(ringbuffer.entries.length, 5); log1.error('four'); t.equal(ringbuffer.entries.length, 5); - t.equal(ringbuffer.entries[0]['msg'], 'chucklebucket'); + t.equal(ringbuffer.entries[0]['msg'], 'android'); t.equal(ringbuffer.entries[1]['msg'], 'one'); t.equal(ringbuffer.entries[2]['msg'], 'two'); t.equal(ringbuffer.entries[3]['msg'], 'three'); From b378560eac8b4d13d8b9b6964f170d1fab37b705 Mon Sep 17 00:00:00 2001 From: Dave Pacheco Date: Wed, 20 Jun 2012 16:26:28 -0700 Subject: [PATCH 4/4] use raw stream support --- README.md | 2 ++ bin/bunyan | 4 ++-- examples/ringbuffer.js | 1 + lib/bunyan.js | 11 ++--------- test/ringbuffer.test.js | 1 + 5 files changed, 8 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 84b3de3..075181d 100644 --- a/README.md +++ b/README.md @@ -476,6 +476,7 @@ To use a RingBuffer: var ringbuffer = new bunyan.RingBuffer({ limit: 100 }); var log = new bunyan({ name: "foo", + raw: true, stream: ringbuffer, level: "debug" }); @@ -487,6 +488,7 @@ This example emits: [ { name: 'foo', hostname: '912d2b29', + raw: true, pid: 50346, level: 30, msg: 'hello world', diff --git a/bin/bunyan b/bin/bunyan index 1494ba6..70bee22 100755 --- a/bin/bunyan +++ b/bin/bunyan @@ -212,10 +212,10 @@ function emitNextRecord(opts, stylize) continue; if (streams[ofile].records.length > 0) { - if (!streams[ofile].paused) { + if (!streams[ofile].paused) { streams[ofile].paused = true; streams[ofile].stream.pause(); - } + } } else if (streams[ofile].paused) { streams[ofile].paused = false; streams[ofile].stream.resume(); diff --git a/examples/ringbuffer.js b/examples/ringbuffer.js index 06d4cb3..4dd7842 100644 --- a/examples/ringbuffer.js +++ b/examples/ringbuffer.js @@ -3,6 +3,7 @@ var bunyan = require('..'); var ringbuffer = new bunyan.RingBuffer({ limit: 100 }); var log = new bunyan({ name: 'foo', + raw: true, stream: ringbuffer, level: 'debug' }); diff --git a/lib/bunyan.js b/lib/bunyan.js index 3a531b9..875c8b9 100644 --- a/lib/bunyan.js +++ b/lib/bunyan.js @@ -992,18 +992,11 @@ function RingBuffer(options) { util.inherits(RingBuffer, EventEmitter); -RingBuffer.prototype.write = function (str) { - var json; - +RingBuffer.prototype.write = function (record) { if (!this.writable) throw (new Error('RingBuffer has been ended already')); - try { - json = JSON.parse(str); - this.entries.push(json); - } catch (ex) { - this.entries.push(str); - } + this.entries.push(record); if (this.entries.length > this.limit) this.entries.shift(); diff --git a/test/ringbuffer.test.js b/test/ringbuffer.test.js index beefd3a..3589be5 100644 --- a/test/ringbuffer.test.js +++ b/test/ringbuffer.test.js @@ -11,6 +11,7 @@ var log1 = new Logger({ streams: [ { stream: ringbuffer, + raw: true, level: 'info' } ]