sort output from multiple sources

This commit is contained in:
Dave Pacheco 2012-05-11 16:22:28 -07:00
parent 4f28afc7a2
commit 123d493d27
2 changed files with 156 additions and 42 deletions

View file

@ -28,7 +28,7 @@ var OM_FROM_NAME = {
"json": OM_JSON,
"inspect": OM_INSPECT,
"simple": OM_SIMPLE
}
};
// Levels
@ -58,8 +58,6 @@ Object.keys(levelFromName).forEach(function (name) {
name.length === 4 ? ' ' : '') + name.toUpperCase();
});
//---- support functions
function getVersion() {
@ -108,7 +106,7 @@ if (!format) {
}
function indent(s) {
return ' ' + s.split(/\r?\n/).join('\n ')
return ' ' + s.split(/\r?\n/).join('\n ');
}
function objCopy(obj) {
@ -141,7 +139,7 @@ function printHelp() {
util.puts(" --no-color Force no coloring (e.g. terminal doesn't support it)");
util.puts(" -o, --output MODE");
util.puts(" Specify an output mode/format. One of");
util.puts(" paul: (the default) pretty")
util.puts(" paul: (the default) pretty");
util.puts(" json: JSON output, 2-space indent");
util.puts(" json-N: JSON output, N-space indent, e.g. 'json-4'");
util.puts(" inspect: node.js `util.inspect` output");
@ -150,6 +148,91 @@ function printHelp() {
util.puts("See <https://github.com/trentm/node-bunyan> for more complete docs.");
}
/*
* If the user specifies multiple input sources, we want to print out records
* from all sources in a single, chronologically ordered stream. To do this
* efficiently, we first assume that all records within each source are ordered
* already, so we need only keep track of the next record in each source and
* the time of the last record emitted. To avoid excess memory usage, we
* pause() streams that are ahead of others.
*
* "streams" is an object indexed by source name (file name) which specifies:
*
* stream Actual stream object, so that we can pause and resume it.
*
* records Array of log records we've read, but not yet emitted. Each
* record includes "line" (the raw line), "rec" (the JSON
* record), and "time" (the parsed time value).
*
* done Whether the stream has any more records to emit.
*/
var streams = {};
function gotRecord(file, line, rec, opts, stylize)
{
var time = new Date(rec.time);
streams[file]['records'].push({ line: line, rec: rec, time: time });
emitNextRecord(opts, stylize);
}
function emitNextRecord(opts, stylize)
{
var ofile, ready, minfile, rec;
for (;;) {
/*
* Take a first pass through the input streams to see if we have a record
* from all of them. If not, we'll pause any streams for which we do
* already have a record (to avoid consuming excess memory) and then wait
* until we have records from the others before emitting the next record.
*
* As part of the same pass, we look for the earliest record we have not yet
* emitted.
*/
minfile = undefined;
ready = true;
for (ofile in streams) {
if (streams[ofile].stream === null ||
(!streams[ofile].done && streams[ofile].records.length === 0)) {
ready = false;
break;
}
if (streams[ofile].records.length > 0 && (minfile === undefined ||
streams[minfile].records[0].time > streams[ofile].records[0].time)) {
minfile = ofile;
}
}
if (!ready) {
for (ofile in streams) {
if (!streams[ofile].stream || streams[ofile].done)
continue;
if (streams[ofile].records.length > 0) {
if (!streams[ofile].paused) {
streams[ofile].paused = true;
streams[ofile].stream.pause();
}
} else if (streams[ofile].paused) {
streams[ofile].paused = false;
streams[ofile].stream.resume();
}
}
return;
}
/*
* Emit the next record for "minfile", and invoke ourselves again to make
* sure we emit as many records as we can right now.
*/
rec = streams[minfile].records.shift();
emitRecord(rec.rec, rec.line, opts, stylize);
}
}
/**
* Parse the command-line options and arguments into an object.
@ -180,7 +263,7 @@ function parseArgv(argv) {
if (args[i].charAt(0) === "-" && args[i].charAt(1) !== '-' && args[i].length > 2) {
var splitOpts = args[i].slice(1).split("");
for (var j = 0; j < splitOpts.length; j++) {
newArgs.push('-' + splitOpts[j])
newArgs.push('-' + splitOpts[j]);
if (optTakesArg[splitOpts[j]]) {
var optArg = splitOpts.slice(j+1).join("");
if (optArg.length) {
@ -195,7 +278,7 @@ function parseArgv(argv) {
}
args = newArgs;
endOfOptions = false;
var endOfOptions = false;
while (args.length > 0) {
var arg = args.shift();
switch(arg) {
@ -291,7 +374,7 @@ function stylizeWithoutColor(str, color) {
/**
* Is this a valid Bunyna log record.
* Is this a valid Bunyan log record.
*/
function isValidRecord(rec) {
if (rec.v == null ||
@ -310,10 +393,11 @@ function isValidRecord(rec) {
/**
* Print out a single result, considering input options.
* Parses the given log line and either emits it right away (for invalid
* records) or enqueues it for emitting later when it's the next line to show.
*/
function handleLogLine(line, opts, stylize) {
// Handle non-JSON lines.
function handleLogLine(file, line, opts, stylize) {
// Emit non-JSON lines immediately.
var rec;
if (!line) {
return emit(line + '\n');
@ -327,6 +411,16 @@ function handleLogLine(line, opts, stylize) {
}
}
if (file === null || !isValidRecord(rec))
return emitRecord(rec, line, opts, stylize);
return gotRecord(file, line, rec, opts, stylize);
}
/**
* Print out a single result, considering input options.
*/
function emitRecord(rec, line, opts, stylize) {
switch (opts.outputMode) {
case OM_PAUL:
// [time] LEVEL: name[/component]/pid on hostname (src): msg* (extras...)
@ -367,8 +461,8 @@ function handleLogLine(line, opts, stylize) {
40: 'magenta', // WARN
50: 'red', // ERROR
60: 'inverse', // FATAL
}
level = stylize(level, colorFromLevel[rec.level])
};
level = stylize(level, colorFromLevel[rec.level]);
}
delete rec.level;
@ -414,7 +508,7 @@ function handleLogLine(line, opts, stylize) {
details.push(indent(format("%s %s HTTP/1.1\n%s", rec.req.method,
rec.req.url,
Object.keys(headers).map(
function (h) { return h + ': ' + headers[h]}).join('\n'))));
function (h) { return h + ': ' + headers[h]; }).join('\n'))));
}
delete rec.req;
@ -431,14 +525,14 @@ function handleLogLine(line, opts, stylize) {
rec.client_req.url,
hostHeaderLine,
Object.keys(headers).map(
function (h) { return h + ': ' + headers[h]}).join('\n'))));
function (h) { return h + ': ' + headers[h]; }).join('\n'))));
}
delete rec.client_req;
if (rec.res) {
var s = '';
if (rec.res.header) {
s += rec.res.header.trimRight()
s += rec.res.header.trimRight();
} else if (rec.res.headers) {
if (rec.res.statusCode) {
s += format("HTTP/1.1 %s %s\n", rec.res.statusCode,
@ -446,7 +540,7 @@ function handleLogLine(line, opts, stylize) {
}
var headers = rec.res.headers;
s += Object.keys(headers).map(
function (h) { return h + ': ' + headers[h]}).join('\n');
function (h) { return h + ': ' + headers[h]; }).join('\n');
}
if (s) {
details.push(indent(s));
@ -455,7 +549,7 @@ function handleLogLine(line, opts, stylize) {
delete rec.res;
if (rec.err && rec.err.stack) {
details.push(indent(rec.err.stack))
details.push(indent(rec.err.stack));
delete rec.err;
}
@ -463,19 +557,18 @@ function handleLogLine(line, opts, stylize) {
for (var i = 0; i < leftover.length; i++) {
var key = leftover[i];
var value = rec[key];
var type = typeof(value);
var stringified = false;
if (typeof(value) !== 'string') {
value = JSON.stringify(value, null, 2);
stringified = true;
}
if (value.indexOf('\n') !== -1 || value.length > 50) {
details.push(indent(key + ': ' + value))
details.push(indent(key + ': ' + value));
} else if (!stringified && (value.indexOf(' ') != -1 ||
value.length === 0)) {
extras.push(key + '=' + JSON.stringify(value))
extras.push(key + '=' + JSON.stringify(value));
} else {
extras.push(key + '=' + value)
extras.push(key + '=' + value);
}
}
@ -531,7 +624,7 @@ process.stdout.on("error", function (err) {
if (err.code === "EPIPE") {
// Pass. See <https://github.com/trentm/json/issues/9>.
} else {
warn(err)
warn(err);
drainStdoutAndExit(1);
}
});
@ -578,17 +671,17 @@ function processStdin(opts, stylize, callback) {
}
if (length > 1) {
handleLogLine(leftover + lines[0], opts, stylize);
handleLogLine(null, leftover + lines[0], opts, stylize);
}
leftover = lines.pop();
length -= 1;
for (var i=1; i < length; i++) {
handleLogLine(lines[i], opts, stylize);
handleLogLine(null, lines[i], opts, stylize);
}
});
stdin.on('end', function () {
if (leftover) {
handleLogLine(leftover, opts, stylize);
handleLogLine(null, leftover, opts, stylize);
leftover = '';
}
callback();
@ -607,9 +700,11 @@ function processStdin(opts, stylize, callback) {
function processFile(file, opts, stylize, callback) {
var stream = fs.createReadStream(file, {encoding: 'utf8'});
streams[file].stream = stream;
stream.on('error', function (err) {
callback(err);
})
});
var leftover = ''; // Left-over partial line from last chunk.
stream.on('data', function (chunk) {
@ -621,20 +716,21 @@ function processFile(file, opts, stylize, callback) {
}
if (length > 1) {
handleLogLine(leftover + lines[0], opts, stylize);
handleLogLine(file, leftover + lines[0], opts, stylize);
}
leftover = lines.pop();
length -= 1;
for (var i=1; i < length; i++) {
handleLogLine(lines[i], opts, stylize);
handleLogLine(file, lines[i], opts, stylize);
}
});
stream.on('end', function () {
if (leftover) {
handleLogLine(leftover, opts, stylize);
handleLogLine(file, leftover, opts, stylize);
leftover = '';
}
streams[file].done = true;
callback();
});
}
@ -643,13 +739,14 @@ function processFile(file, opts, stylize, callback) {
/**
* From node async module.
*/
function asyncForEachSeries(arr, iterator, callback) {
function asyncForEach(arr, iterator, callback) {
callback = callback || function () {};
if (!arr.length) {
return callback();
}
var completed = 0;
var iterate = function () {
iterator(arr[completed], function (err) {
arr.forEach(function (x) {
iterator(x, function (err) {
if (err) {
callback(err);
callback = function () {};
@ -659,13 +756,9 @@ function asyncForEachSeries(arr, iterator, callback) {
if (completed === arr.length) {
callback();
}
else {
iterate();
}
}
});
};
iterate();
});
};
@ -677,7 +770,7 @@ function main(argv) {
try {
opts = parseArgv(argv);
} catch (e) {
warn("bunyan: error: %s", e.message)
warn("bunyan: error: %s", e.message);
return drainStdoutAndExit(1);
}
if (opts.help) {
@ -693,8 +786,10 @@ function main(argv) {
var retval = 0;
if (opts.args.length > 0) {
var files = opts.args;
var i = 0;
asyncForEachSeries(files,
files.forEach(function (file) {
streams[file] = { stream: null, records: [], done: false }
});
asyncForEach(files,
function (file, next) {
processFile(file, opts, stylize, function (err) {
if (err) {
@ -715,7 +810,7 @@ function main(argv) {
} else {
processStdin(opts, stylize, function () {
process.exit(retval);
})
});
}
}

View file

@ -149,3 +149,22 @@ test('simple.log doesnotexist1.log doesnotexist2.log', function (t) {
}
);
});
test('multiple logs', function (t) {
exec(BUNYAN + ' corpus/log1.log corpus/log2.log',
function (err, stdout, stderr) {
t.error(err);
t.equal(stdout, [
'[2012-05-08T16:57:55.586Z] INFO: agent1/73267 on headnode: message\n',
'[2012-05-08T16:58:55.586Z] INFO: agent2/73267 on headnode: message\n',
'[2012-05-08T17:01:49.339Z] INFO: agent2/73267 on headnode: message\n',
'[2012-05-08T17:02:47.404Z] INFO: agent2/73267 on headnode: message\n',
'[2012-05-08T17:02:49.339Z] INFO: agent1/73267 on headnode: message\n',
'[2012-05-08T17:02:49.404Z] INFO: agent1/73267 on headnode: message\n',
'[2012-05-08T17:02:49.404Z] INFO: agent1/73267 on headnode: message\n',
'[2012-05-08T17:02:57.404Z] INFO: agent2/73267 on headnode: message\n',
'[2012-05-08T17:08:01.105Z] INFO: agent2/76156 on headnode: message\n',
].join(''));
t.end();
});
});