From 123d493d275b4a77bc986800abaf5e5c482801d1 Mon Sep 17 00:00:00 2001 From: Dave Pacheco Date: Fri, 11 May 2012 16:22:28 -0700 Subject: [PATCH] sort output from multiple sources --- bin/bunyan | 179 ++++++++++++++++++++++++++++++++++++----------- test/cli.test.js | 19 +++++ 2 files changed, 156 insertions(+), 42 deletions(-) diff --git a/bin/bunyan b/bin/bunyan index 2ce958c..3bad2e9 100755 --- a/bin/bunyan +++ b/bin/bunyan @@ -28,7 +28,7 @@ var OM_FROM_NAME = { "json": OM_JSON, "inspect": OM_INSPECT, "simple": OM_SIMPLE -} +}; // Levels @@ -58,8 +58,6 @@ Object.keys(levelFromName).forEach(function (name) { name.length === 4 ? ' ' : '') + name.toUpperCase(); }); - - //---- support functions function getVersion() { @@ -108,7 +106,7 @@ if (!format) { } function indent(s) { - return ' ' + s.split(/\r?\n/).join('\n ') + return ' ' + s.split(/\r?\n/).join('\n '); } function objCopy(obj) { @@ -141,7 +139,7 @@ function printHelp() { util.puts(" --no-color Force no coloring (e.g. terminal doesn't support it)"); util.puts(" -o, --output MODE"); util.puts(" Specify an output mode/format. One of"); - util.puts(" paul: (the default) pretty") + util.puts(" paul: (the default) pretty"); util.puts(" json: JSON output, 2-space indent"); util.puts(" json-N: JSON output, N-space indent, e.g. 'json-4'"); util.puts(" inspect: node.js `util.inspect` output"); @@ -150,6 +148,91 @@ function printHelp() { util.puts("See for more complete docs."); } +/* + * If the user specifies multiple input sources, we want to print out records + * from all sources in a single, chronologically ordered stream. To do this + * efficiently, we first assume that all records within each source are ordered + * already, so we need only keep track of the next record in each source and + * the time of the last record emitted. To avoid excess memory usage, we + * pause() streams that are ahead of others. + * + * "streams" is an object indexed by source name (file name) which specifies: + * + * stream Actual stream object, so that we can pause and resume it. + * + * records Array of log records we've read, but not yet emitted. Each + * record includes "line" (the raw line), "rec" (the JSON + * record), and "time" (the parsed time value). + * + * done Whether the stream has any more records to emit. + */ +var streams = {}; + +function gotRecord(file, line, rec, opts, stylize) +{ + var time = new Date(rec.time); + + streams[file]['records'].push({ line: line, rec: rec, time: time }); + emitNextRecord(opts, stylize); +} + +function emitNextRecord(opts, stylize) +{ + var ofile, ready, minfile, rec; + + for (;;) { + /* + * Take a first pass through the input streams to see if we have a record + * from all of them. If not, we'll pause any streams for which we do + * already have a record (to avoid consuming excess memory) and then wait + * until we have records from the others before emitting the next record. + * + * As part of the same pass, we look for the earliest record we have not yet + * emitted. + */ + minfile = undefined; + ready = true; + for (ofile in streams) { + + if (streams[ofile].stream === null || + (!streams[ofile].done && streams[ofile].records.length === 0)) { + ready = false; + break; + } + + if (streams[ofile].records.length > 0 && (minfile === undefined || + streams[minfile].records[0].time > streams[ofile].records[0].time)) { + minfile = ofile; + } + } + + if (!ready) { + for (ofile in streams) { + if (!streams[ofile].stream || streams[ofile].done) + continue; + + if (streams[ofile].records.length > 0) { + if (!streams[ofile].paused) { + streams[ofile].paused = true; + streams[ofile].stream.pause(); + } + } else if (streams[ofile].paused) { + streams[ofile].paused = false; + streams[ofile].stream.resume(); + } + } + + return; + } + + /* + * Emit the next record for "minfile", and invoke ourselves again to make + * sure we emit as many records as we can right now. + */ + rec = streams[minfile].records.shift(); + emitRecord(rec.rec, rec.line, opts, stylize); + } +} /** * Parse the command-line options and arguments into an object. @@ -180,7 +263,7 @@ function parseArgv(argv) { if (args[i].charAt(0) === "-" && args[i].charAt(1) !== '-' && args[i].length > 2) { var splitOpts = args[i].slice(1).split(""); for (var j = 0; j < splitOpts.length; j++) { - newArgs.push('-' + splitOpts[j]) + newArgs.push('-' + splitOpts[j]); if (optTakesArg[splitOpts[j]]) { var optArg = splitOpts.slice(j+1).join(""); if (optArg.length) { @@ -195,7 +278,7 @@ function parseArgv(argv) { } args = newArgs; - endOfOptions = false; + var endOfOptions = false; while (args.length > 0) { var arg = args.shift(); switch(arg) { @@ -291,7 +374,7 @@ function stylizeWithoutColor(str, color) { /** - * Is this a valid Bunyna log record. + * Is this a valid Bunyan log record. */ function isValidRecord(rec) { if (rec.v == null || @@ -310,10 +393,11 @@ function isValidRecord(rec) { /** - * Print out a single result, considering input options. + * Parses the given log line and either emits it right away (for invalid + * records) or enqueues it for emitting later when it's the next line to show. */ -function handleLogLine(line, opts, stylize) { - // Handle non-JSON lines. +function handleLogLine(file, line, opts, stylize) { + // Emit non-JSON lines immediately. var rec; if (!line) { return emit(line + '\n'); @@ -327,6 +411,16 @@ function handleLogLine(line, opts, stylize) { } } + if (file === null || !isValidRecord(rec)) + return emitRecord(rec, line, opts, stylize); + + return gotRecord(file, line, rec, opts, stylize); +} + +/** + * Print out a single result, considering input options. + */ +function emitRecord(rec, line, opts, stylize) { switch (opts.outputMode) { case OM_PAUL: // [time] LEVEL: name[/component]/pid on hostname (src): msg* (extras...) @@ -367,8 +461,8 @@ function handleLogLine(line, opts, stylize) { 40: 'magenta', // WARN 50: 'red', // ERROR 60: 'inverse', // FATAL - } - level = stylize(level, colorFromLevel[rec.level]) + }; + level = stylize(level, colorFromLevel[rec.level]); } delete rec.level; @@ -414,7 +508,7 @@ function handleLogLine(line, opts, stylize) { details.push(indent(format("%s %s HTTP/1.1\n%s", rec.req.method, rec.req.url, Object.keys(headers).map( - function (h) { return h + ': ' + headers[h]}).join('\n')))); + function (h) { return h + ': ' + headers[h]; }).join('\n')))); } delete rec.req; @@ -431,14 +525,14 @@ function handleLogLine(line, opts, stylize) { rec.client_req.url, hostHeaderLine, Object.keys(headers).map( - function (h) { return h + ': ' + headers[h]}).join('\n')))); + function (h) { return h + ': ' + headers[h]; }).join('\n')))); } delete rec.client_req; if (rec.res) { var s = ''; if (rec.res.header) { - s += rec.res.header.trimRight() + s += rec.res.header.trimRight(); } else if (rec.res.headers) { if (rec.res.statusCode) { s += format("HTTP/1.1 %s %s\n", rec.res.statusCode, @@ -446,7 +540,7 @@ function handleLogLine(line, opts, stylize) { } var headers = rec.res.headers; s += Object.keys(headers).map( - function (h) { return h + ': ' + headers[h]}).join('\n'); + function (h) { return h + ': ' + headers[h]; }).join('\n'); } if (s) { details.push(indent(s)); @@ -455,7 +549,7 @@ function handleLogLine(line, opts, stylize) { delete rec.res; if (rec.err && rec.err.stack) { - details.push(indent(rec.err.stack)) + details.push(indent(rec.err.stack)); delete rec.err; } @@ -463,19 +557,18 @@ function handleLogLine(line, opts, stylize) { for (var i = 0; i < leftover.length; i++) { var key = leftover[i]; var value = rec[key]; - var type = typeof(value); var stringified = false; if (typeof(value) !== 'string') { value = JSON.stringify(value, null, 2); stringified = true; } if (value.indexOf('\n') !== -1 || value.length > 50) { - details.push(indent(key + ': ' + value)) + details.push(indent(key + ': ' + value)); } else if (!stringified && (value.indexOf(' ') != -1 || value.length === 0)) { - extras.push(key + '=' + JSON.stringify(value)) + extras.push(key + '=' + JSON.stringify(value)); } else { - extras.push(key + '=' + value) + extras.push(key + '=' + value); } } @@ -531,7 +624,7 @@ process.stdout.on("error", function (err) { if (err.code === "EPIPE") { // Pass. See . } else { - warn(err) + warn(err); drainStdoutAndExit(1); } }); @@ -578,17 +671,17 @@ function processStdin(opts, stylize, callback) { } if (length > 1) { - handleLogLine(leftover + lines[0], opts, stylize); + handleLogLine(null, leftover + lines[0], opts, stylize); } leftover = lines.pop(); length -= 1; for (var i=1; i < length; i++) { - handleLogLine(lines[i], opts, stylize); + handleLogLine(null, lines[i], opts, stylize); } }); stdin.on('end', function () { if (leftover) { - handleLogLine(leftover, opts, stylize); + handleLogLine(null, leftover, opts, stylize); leftover = ''; } callback(); @@ -607,9 +700,11 @@ function processStdin(opts, stylize, callback) { function processFile(file, opts, stylize, callback) { var stream = fs.createReadStream(file, {encoding: 'utf8'}); + streams[file].stream = stream; + stream.on('error', function (err) { callback(err); - }) + }); var leftover = ''; // Left-over partial line from last chunk. stream.on('data', function (chunk) { @@ -621,20 +716,21 @@ function processFile(file, opts, stylize, callback) { } if (length > 1) { - handleLogLine(leftover + lines[0], opts, stylize); + handleLogLine(file, leftover + lines[0], opts, stylize); } leftover = lines.pop(); length -= 1; for (var i=1; i < length; i++) { - handleLogLine(lines[i], opts, stylize); + handleLogLine(file, lines[i], opts, stylize); } }); stream.on('end', function () { if (leftover) { - handleLogLine(leftover, opts, stylize); + handleLogLine(file, leftover, opts, stylize); leftover = ''; } + streams[file].done = true; callback(); }); } @@ -643,13 +739,14 @@ function processFile(file, opts, stylize, callback) { /** * From node async module. */ -function asyncForEachSeries(arr, iterator, callback) { +function asyncForEach(arr, iterator, callback) { + callback = callback || function () {}; if (!arr.length) { return callback(); } var completed = 0; - var iterate = function () { - iterator(arr[completed], function (err) { + arr.forEach(function (x) { + iterator(x, function (err) { if (err) { callback(err); callback = function () {}; @@ -659,13 +756,9 @@ function asyncForEachSeries(arr, iterator, callback) { if (completed === arr.length) { callback(); } - else { - iterate(); - } } }); - }; - iterate(); + }); }; @@ -677,7 +770,7 @@ function main(argv) { try { opts = parseArgv(argv); } catch (e) { - warn("bunyan: error: %s", e.message) + warn("bunyan: error: %s", e.message); return drainStdoutAndExit(1); } if (opts.help) { @@ -693,8 +786,10 @@ function main(argv) { var retval = 0; if (opts.args.length > 0) { var files = opts.args; - var i = 0; - asyncForEachSeries(files, + files.forEach(function (file) { + streams[file] = { stream: null, records: [], done: false } + }); + asyncForEach(files, function (file, next) { processFile(file, opts, stylize, function (err) { if (err) { @@ -715,7 +810,7 @@ function main(argv) { } else { processStdin(opts, stylize, function () { process.exit(retval); - }) + }); } } diff --git a/test/cli.test.js b/test/cli.test.js index bf1fe9e..9ba871b 100644 --- a/test/cli.test.js +++ b/test/cli.test.js @@ -149,3 +149,22 @@ test('simple.log doesnotexist1.log doesnotexist2.log', function (t) { } ); }); + +test('multiple logs', function (t) { + exec(BUNYAN + ' corpus/log1.log corpus/log2.log', + function (err, stdout, stderr) { + t.error(err); + t.equal(stdout, [ + '[2012-05-08T16:57:55.586Z] INFO: agent1/73267 on headnode: message\n', + '[2012-05-08T16:58:55.586Z] INFO: agent2/73267 on headnode: message\n', + '[2012-05-08T17:01:49.339Z] INFO: agent2/73267 on headnode: message\n', + '[2012-05-08T17:02:47.404Z] INFO: agent2/73267 on headnode: message\n', + '[2012-05-08T17:02:49.339Z] INFO: agent1/73267 on headnode: message\n', + '[2012-05-08T17:02:49.404Z] INFO: agent1/73267 on headnode: message\n', + '[2012-05-08T17:02:49.404Z] INFO: agent1/73267 on headnode: message\n', + '[2012-05-08T17:02:57.404Z] INFO: agent2/73267 on headnode: message\n', + '[2012-05-08T17:08:01.105Z] INFO: agent2/76156 on headnode: message\n', + ].join('')); + t.end(); + }); +});