Merge pull request #15 from davepacheco/master
bunyan library: RingBuffer stream class. bunyan cli: Sort output from multiple sources.
This commit is contained in:
commit
aa5440c540
7 changed files with 285 additions and 43 deletions
32
README.md
32
README.md
|
@ -463,6 +463,38 @@ Supported stream types are:
|
|||
// Handle stream write or create error here.
|
||||
});
|
||||
|
||||
Bunyan comes with a special stream called a RingBuffer which keeps the last N
|
||||
records in memory and does *not* write the data anywhere else. One common
|
||||
strategy is to log 'info' and higher to a normal log file but log all records
|
||||
(including 'trace') to a ringbuffer that you can access via a debugger, or your
|
||||
own HTTP interface, or a post-mortem facility like MDB or node-panic.
|
||||
|
||||
To use a RingBuffer:
|
||||
|
||||
/* Create a ring buffer that stores the last 100 entries. */
|
||||
var bunyan = require('bunyan');
|
||||
var ringbuffer = new bunyan.RingBuffer({ limit: 100 });
|
||||
var log = new bunyan({
|
||||
name: "foo",
|
||||
raw: true,
|
||||
stream: ringbuffer,
|
||||
level: "debug"
|
||||
});
|
||||
|
||||
log.info('hello world');
|
||||
console.log(ringbuffer.entries);
|
||||
|
||||
This example emits:
|
||||
|
||||
[ { name: 'foo',
|
||||
hostname: '912d2b29',
|
||||
raw: true,
|
||||
pid: 50346,
|
||||
level: 30,
|
||||
msg: 'hello world',
|
||||
time: '2012-06-19T21:34:19.906Z',
|
||||
v: 0 } ]
|
||||
|
||||
|
||||
# License
|
||||
|
||||
|
|
1
TODO.md
1
TODO.md
|
@ -15,7 +15,6 @@
|
|||
works *and* that an existing field in the parent is not *re-serialized*.
|
||||
- a "rolling-file" stream: but specifically by time, e.g. hourly. (MarkC
|
||||
requested)
|
||||
- ringBuffer stream
|
||||
- split out `bunyan` cli to a "bunyan" or "bunyan-reader" or "node-bunyan-reader"
|
||||
as the basis for tools to consume bunyan logs. It can grow indep of node-bunyan
|
||||
for generating the logs.
|
||||
|
|
179
bin/bunyan
179
bin/bunyan
|
@ -28,7 +28,7 @@ var OM_FROM_NAME = {
|
|||
"json": OM_JSON,
|
||||
"inspect": OM_INSPECT,
|
||||
"simple": OM_SIMPLE
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
// Levels
|
||||
|
@ -58,8 +58,6 @@ Object.keys(levelFromName).forEach(function (name) {
|
|||
name.length === 4 ? ' ' : '') + name.toUpperCase();
|
||||
});
|
||||
|
||||
|
||||
|
||||
//---- support functions
|
||||
|
||||
function getVersion() {
|
||||
|
@ -108,7 +106,7 @@ if (!format) {
|
|||
}
|
||||
|
||||
function indent(s) {
|
||||
return ' ' + s.split(/\r?\n/).join('\n ')
|
||||
return ' ' + s.split(/\r?\n/).join('\n ');
|
||||
}
|
||||
|
||||
function objCopy(obj) {
|
||||
|
@ -141,7 +139,7 @@ function printHelp() {
|
|||
util.puts(" --no-color Force no coloring (e.g. terminal doesn't support it)");
|
||||
util.puts(" -o, --output MODE");
|
||||
util.puts(" Specify an output mode/format. One of");
|
||||
util.puts(" paul: (the default) pretty")
|
||||
util.puts(" paul: (the default) pretty");
|
||||
util.puts(" json: JSON output, 2-space indent");
|
||||
util.puts(" json-N: JSON output, N-space indent, e.g. 'json-4'");
|
||||
util.puts(" inspect: node.js `util.inspect` output");
|
||||
|
@ -150,6 +148,91 @@ function printHelp() {
|
|||
util.puts("See <https://github.com/trentm/node-bunyan> for more complete docs.");
|
||||
}
|
||||
|
||||
/*
|
||||
* If the user specifies multiple input sources, we want to print out records
|
||||
* from all sources in a single, chronologically ordered stream. To do this
|
||||
* efficiently, we first assume that all records within each source are ordered
|
||||
* already, so we need only keep track of the next record in each source and
|
||||
* the time of the last record emitted. To avoid excess memory usage, we
|
||||
* pause() streams that are ahead of others.
|
||||
*
|
||||
* "streams" is an object indexed by source name (file name) which specifies:
|
||||
*
|
||||
* stream Actual stream object, so that we can pause and resume it.
|
||||
*
|
||||
* records Array of log records we've read, but not yet emitted. Each
|
||||
* record includes "line" (the raw line), "rec" (the JSON
|
||||
* record), and "time" (the parsed time value).
|
||||
*
|
||||
* done Whether the stream has any more records to emit.
|
||||
*/
|
||||
var streams = {};
|
||||
|
||||
function gotRecord(file, line, rec, opts, stylize)
|
||||
{
|
||||
var time = new Date(rec.time);
|
||||
|
||||
streams[file]['records'].push({ line: line, rec: rec, time: time });
|
||||
emitNextRecord(opts, stylize);
|
||||
}
|
||||
|
||||
function emitNextRecord(opts, stylize)
|
||||
{
|
||||
var ofile, ready, minfile, rec;
|
||||
|
||||
for (;;) {
|
||||
/*
|
||||
* Take a first pass through the input streams to see if we have a record
|
||||
* from all of them. If not, we'll pause any streams for which we do
|
||||
* already have a record (to avoid consuming excess memory) and then wait
|
||||
* until we have records from the others before emitting the next record.
|
||||
*
|
||||
* As part of the same pass, we look for the earliest record we have not yet
|
||||
* emitted.
|
||||
*/
|
||||
minfile = undefined;
|
||||
ready = true;
|
||||
for (ofile in streams) {
|
||||
|
||||
if (streams[ofile].stream === null ||
|
||||
(!streams[ofile].done && streams[ofile].records.length === 0)) {
|
||||
ready = false;
|
||||
break;
|
||||
}
|
||||
|
||||
if (streams[ofile].records.length > 0 && (minfile === undefined ||
|
||||
streams[minfile].records[0].time > streams[ofile].records[0].time)) {
|
||||
minfile = ofile;
|
||||
}
|
||||
}
|
||||
|
||||
if (!ready) {
|
||||
for (ofile in streams) {
|
||||
if (!streams[ofile].stream || streams[ofile].done)
|
||||
continue;
|
||||
|
||||
if (streams[ofile].records.length > 0) {
|
||||
if (!streams[ofile].paused) {
|
||||
streams[ofile].paused = true;
|
||||
streams[ofile].stream.pause();
|
||||
}
|
||||
} else if (streams[ofile].paused) {
|
||||
streams[ofile].paused = false;
|
||||
streams[ofile].stream.resume();
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Emit the next record for "minfile", and invoke ourselves again to make
|
||||
* sure we emit as many records as we can right now.
|
||||
*/
|
||||
rec = streams[minfile].records.shift();
|
||||
emitRecord(rec.rec, rec.line, opts, stylize);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse the command-line options and arguments into an object.
|
||||
|
@ -180,7 +263,7 @@ function parseArgv(argv) {
|
|||
if (args[i].charAt(0) === "-" && args[i].charAt(1) !== '-' && args[i].length > 2) {
|
||||
var splitOpts = args[i].slice(1).split("");
|
||||
for (var j = 0; j < splitOpts.length; j++) {
|
||||
newArgs.push('-' + splitOpts[j])
|
||||
newArgs.push('-' + splitOpts[j]);
|
||||
if (optTakesArg[splitOpts[j]]) {
|
||||
var optArg = splitOpts.slice(j+1).join("");
|
||||
if (optArg.length) {
|
||||
|
@ -195,7 +278,7 @@ function parseArgv(argv) {
|
|||
}
|
||||
args = newArgs;
|
||||
|
||||
endOfOptions = false;
|
||||
var endOfOptions = false;
|
||||
while (args.length > 0) {
|
||||
var arg = args.shift();
|
||||
switch(arg) {
|
||||
|
@ -291,7 +374,7 @@ function stylizeWithoutColor(str, color) {
|
|||
|
||||
|
||||
/**
|
||||
* Is this a valid Bunyna log record.
|
||||
* Is this a valid Bunyan log record.
|
||||
*/
|
||||
function isValidRecord(rec) {
|
||||
if (rec.v == null ||
|
||||
|
@ -310,10 +393,11 @@ function isValidRecord(rec) {
|
|||
|
||||
|
||||
/**
|
||||
* Print out a single result, considering input options.
|
||||
* Parses the given log line and either emits it right away (for invalid
|
||||
* records) or enqueues it for emitting later when it's the next line to show.
|
||||
*/
|
||||
function handleLogLine(line, opts, stylize) {
|
||||
// Handle non-JSON lines.
|
||||
function handleLogLine(file, line, opts, stylize) {
|
||||
// Emit non-JSON lines immediately.
|
||||
var rec;
|
||||
if (!line) {
|
||||
return emit(line + '\n');
|
||||
|
@ -327,6 +411,16 @@ function handleLogLine(line, opts, stylize) {
|
|||
}
|
||||
}
|
||||
|
||||
if (file === null || !isValidRecord(rec))
|
||||
return emitRecord(rec, line, opts, stylize);
|
||||
|
||||
return gotRecord(file, line, rec, opts, stylize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Print out a single result, considering input options.
|
||||
*/
|
||||
function emitRecord(rec, line, opts, stylize) {
|
||||
switch (opts.outputMode) {
|
||||
case OM_PAUL:
|
||||
// [time] LEVEL: name[/component]/pid on hostname (src): msg* (extras...)
|
||||
|
@ -367,8 +461,8 @@ function handleLogLine(line, opts, stylize) {
|
|||
40: 'magenta', // WARN
|
||||
50: 'red', // ERROR
|
||||
60: 'inverse', // FATAL
|
||||
}
|
||||
level = stylize(level, colorFromLevel[rec.level])
|
||||
};
|
||||
level = stylize(level, colorFromLevel[rec.level]);
|
||||
}
|
||||
delete rec.level;
|
||||
|
||||
|
@ -414,7 +508,7 @@ function handleLogLine(line, opts, stylize) {
|
|||
details.push(indent(format("%s %s HTTP/1.1\n%s", rec.req.method,
|
||||
rec.req.url,
|
||||
Object.keys(headers).map(
|
||||
function (h) { return h + ': ' + headers[h]}).join('\n'))));
|
||||
function (h) { return h + ': ' + headers[h]; }).join('\n'))));
|
||||
}
|
||||
delete rec.req;
|
||||
|
||||
|
@ -431,14 +525,14 @@ function handleLogLine(line, opts, stylize) {
|
|||
rec.client_req.url,
|
||||
hostHeaderLine,
|
||||
Object.keys(headers).map(
|
||||
function (h) { return h + ': ' + headers[h]}).join('\n'))));
|
||||
function (h) { return h + ': ' + headers[h]; }).join('\n'))));
|
||||
}
|
||||
delete rec.client_req;
|
||||
|
||||
if (rec.res) {
|
||||
var s = '';
|
||||
if (rec.res.header) {
|
||||
s += rec.res.header.trimRight()
|
||||
s += rec.res.header.trimRight();
|
||||
} else if (rec.res.headers) {
|
||||
if (rec.res.statusCode) {
|
||||
s += format("HTTP/1.1 %s %s\n", rec.res.statusCode,
|
||||
|
@ -446,7 +540,7 @@ function handleLogLine(line, opts, stylize) {
|
|||
}
|
||||
var headers = rec.res.headers;
|
||||
s += Object.keys(headers).map(
|
||||
function (h) { return h + ': ' + headers[h]}).join('\n');
|
||||
function (h) { return h + ': ' + headers[h]; }).join('\n');
|
||||
}
|
||||
if (s) {
|
||||
details.push(indent(s));
|
||||
|
@ -455,7 +549,7 @@ function handleLogLine(line, opts, stylize) {
|
|||
delete rec.res;
|
||||
|
||||
if (rec.err && rec.err.stack) {
|
||||
details.push(indent(rec.err.stack))
|
||||
details.push(indent(rec.err.stack));
|
||||
delete rec.err;
|
||||
}
|
||||
|
||||
|
@ -463,19 +557,18 @@ function handleLogLine(line, opts, stylize) {
|
|||
for (var i = 0; i < leftover.length; i++) {
|
||||
var key = leftover[i];
|
||||
var value = rec[key];
|
||||
var type = typeof(value);
|
||||
var stringified = false;
|
||||
if (typeof(value) !== 'string') {
|
||||
value = JSON.stringify(value, null, 2);
|
||||
stringified = true;
|
||||
}
|
||||
if (value.indexOf('\n') !== -1 || value.length > 50) {
|
||||
details.push(indent(key + ': ' + value))
|
||||
details.push(indent(key + ': ' + value));
|
||||
} else if (!stringified && (value.indexOf(' ') != -1 ||
|
||||
value.length === 0)) {
|
||||
extras.push(key + '=' + JSON.stringify(value))
|
||||
extras.push(key + '=' + JSON.stringify(value));
|
||||
} else {
|
||||
extras.push(key + '=' + value)
|
||||
extras.push(key + '=' + value);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -531,7 +624,7 @@ process.stdout.on("error", function (err) {
|
|||
if (err.code === "EPIPE") {
|
||||
// Pass. See <https://github.com/trentm/json/issues/9>.
|
||||
} else {
|
||||
warn(err)
|
||||
warn(err);
|
||||
drainStdoutAndExit(1);
|
||||
}
|
||||
});
|
||||
|
@ -578,17 +671,17 @@ function processStdin(opts, stylize, callback) {
|
|||
}
|
||||
|
||||
if (length > 1) {
|
||||
handleLogLine(leftover + lines[0], opts, stylize);
|
||||
handleLogLine(null, leftover + lines[0], opts, stylize);
|
||||
}
|
||||
leftover = lines.pop();
|
||||
length -= 1;
|
||||
for (var i=1; i < length; i++) {
|
||||
handleLogLine(lines[i], opts, stylize);
|
||||
handleLogLine(null, lines[i], opts, stylize);
|
||||
}
|
||||
});
|
||||
stdin.on('end', function () {
|
||||
if (leftover) {
|
||||
handleLogLine(leftover, opts, stylize);
|
||||
handleLogLine(null, leftover, opts, stylize);
|
||||
leftover = '';
|
||||
}
|
||||
callback();
|
||||
|
@ -607,9 +700,11 @@ function processStdin(opts, stylize, callback) {
|
|||
function processFile(file, opts, stylize, callback) {
|
||||
var stream = fs.createReadStream(file, {encoding: 'utf8'});
|
||||
|
||||
streams[file].stream = stream;
|
||||
|
||||
stream.on('error', function (err) {
|
||||
callback(err);
|
||||
})
|
||||
});
|
||||
|
||||
var leftover = ''; // Left-over partial line from last chunk.
|
||||
stream.on('data', function (chunk) {
|
||||
|
@ -621,20 +716,21 @@ function processFile(file, opts, stylize, callback) {
|
|||
}
|
||||
|
||||
if (length > 1) {
|
||||
handleLogLine(leftover + lines[0], opts, stylize);
|
||||
handleLogLine(file, leftover + lines[0], opts, stylize);
|
||||
}
|
||||
leftover = lines.pop();
|
||||
length -= 1;
|
||||
for (var i=1; i < length; i++) {
|
||||
handleLogLine(lines[i], opts, stylize);
|
||||
handleLogLine(file, lines[i], opts, stylize);
|
||||
}
|
||||
});
|
||||
|
||||
stream.on('end', function () {
|
||||
if (leftover) {
|
||||
handleLogLine(leftover, opts, stylize);
|
||||
handleLogLine(file, leftover, opts, stylize);
|
||||
leftover = '';
|
||||
}
|
||||
streams[file].done = true;
|
||||
callback();
|
||||
});
|
||||
}
|
||||
|
@ -643,13 +739,14 @@ function processFile(file, opts, stylize, callback) {
|
|||
/**
|
||||
* From node async module.
|
||||
*/
|
||||
function asyncForEachSeries(arr, iterator, callback) {
|
||||
function asyncForEach(arr, iterator, callback) {
|
||||
callback = callback || function () {};
|
||||
if (!arr.length) {
|
||||
return callback();
|
||||
}
|
||||
var completed = 0;
|
||||
var iterate = function () {
|
||||
iterator(arr[completed], function (err) {
|
||||
arr.forEach(function (x) {
|
||||
iterator(x, function (err) {
|
||||
if (err) {
|
||||
callback(err);
|
||||
callback = function () {};
|
||||
|
@ -659,13 +756,9 @@ function asyncForEachSeries(arr, iterator, callback) {
|
|||
if (completed === arr.length) {
|
||||
callback();
|
||||
}
|
||||
else {
|
||||
iterate();
|
||||
}
|
||||
}
|
||||
});
|
||||
};
|
||||
iterate();
|
||||
});
|
||||
};
|
||||
|
||||
|
||||
|
@ -677,7 +770,7 @@ function main(argv) {
|
|||
try {
|
||||
opts = parseArgv(argv);
|
||||
} catch (e) {
|
||||
warn("bunyan: error: %s", e.message)
|
||||
warn("bunyan: error: %s", e.message);
|
||||
return drainStdoutAndExit(1);
|
||||
}
|
||||
if (opts.help) {
|
||||
|
@ -693,8 +786,10 @@ function main(argv) {
|
|||
var retval = 0;
|
||||
if (opts.args.length > 0) {
|
||||
var files = opts.args;
|
||||
var i = 0;
|
||||
asyncForEachSeries(files,
|
||||
files.forEach(function (file) {
|
||||
streams[file] = { stream: null, records: [], done: false }
|
||||
});
|
||||
asyncForEach(files,
|
||||
function (file, next) {
|
||||
processFile(file, opts, stylize, function (err) {
|
||||
if (err) {
|
||||
|
@ -715,7 +810,7 @@ function main(argv) {
|
|||
} else {
|
||||
processStdin(opts, stylize, function () {
|
||||
process.exit(retval);
|
||||
})
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
12
examples/ringbuffer.js
Normal file
12
examples/ringbuffer.js
Normal file
|
@ -0,0 +1,12 @@
|
|||
/* Create a ring buffer that stores the last 100 entries. */
|
||||
var bunyan = require('..');
|
||||
var ringbuffer = new bunyan.RingBuffer({ limit: 100 });
|
||||
var log = new bunyan({
|
||||
name: 'foo',
|
||||
raw: true,
|
||||
stream: ringbuffer,
|
||||
level: 'debug'
|
||||
});
|
||||
|
||||
log.info('hello world');
|
||||
console.log(ringbuffer.entries);
|
|
@ -975,6 +975,50 @@ var errSerializer = Logger.stdSerializers.err = function err(err) {
|
|||
};
|
||||
|
||||
|
||||
/**
|
||||
* RingBuffer is a Writable Stream that just stores the last N entries in
|
||||
* memory.
|
||||
*
|
||||
* @param options {Object}, with the following fields:
|
||||
*
|
||||
* - limit: number of entries to keep in memory
|
||||
*/
|
||||
function RingBuffer(options) {
|
||||
this.limit = options && options.limit ? options.limit : 100;
|
||||
this.writable = true;
|
||||
this.entries = [];
|
||||
EventEmitter.call(this);
|
||||
}
|
||||
|
||||
util.inherits(RingBuffer, EventEmitter);
|
||||
|
||||
RingBuffer.prototype.write = function (record) {
|
||||
if (!this.writable)
|
||||
throw (new Error('RingBuffer has been ended already'));
|
||||
|
||||
this.entries.push(record);
|
||||
|
||||
if (this.entries.length > this.limit)
|
||||
this.entries.shift();
|
||||
|
||||
return (true);
|
||||
};
|
||||
|
||||
RingBuffer.prototype.end = function () {
|
||||
if (arguments.length > 0)
|
||||
this.write.apply(this, Array.prototype.slice.call(arguments));
|
||||
this.writable = false;
|
||||
};
|
||||
|
||||
RingBuffer.prototype.destroy = function () {
|
||||
this.writable = false;
|
||||
this.emit('close');
|
||||
};
|
||||
|
||||
RingBuffer.prototype.destroySoon = function () {
|
||||
this.destroy();
|
||||
};
|
||||
|
||||
|
||||
//---- Exports
|
||||
|
||||
|
@ -993,3 +1037,5 @@ module.exports.LOG_VERSION = LOG_VERSION;
|
|||
module.exports.createLogger = function createLogger(options) {
|
||||
return new Logger(options);
|
||||
};
|
||||
|
||||
module.exports.RingBuffer = RingBuffer;
|
||||
|
|
|
@ -149,3 +149,22 @@ test('simple.log doesnotexist1.log doesnotexist2.log', function (t) {
|
|||
}
|
||||
);
|
||||
});
|
||||
|
||||
test('multiple logs', function (t) {
|
||||
exec(BUNYAN + ' corpus/log1.log corpus/log2.log',
|
||||
function (err, stdout, stderr) {
|
||||
t.error(err);
|
||||
t.equal(stdout, [
|
||||
'[2012-05-08T16:57:55.586Z] INFO: agent1/73267 on headnode: message\n',
|
||||
'[2012-05-08T16:58:55.586Z] INFO: agent2/73267 on headnode: message\n',
|
||||
'[2012-05-08T17:01:49.339Z] INFO: agent2/73267 on headnode: message\n',
|
||||
'[2012-05-08T17:02:47.404Z] INFO: agent2/73267 on headnode: message\n',
|
||||
'[2012-05-08T17:02:49.339Z] INFO: agent1/73267 on headnode: message\n',
|
||||
'[2012-05-08T17:02:49.404Z] INFO: agent1/73267 on headnode: message\n',
|
||||
'[2012-05-08T17:02:49.404Z] INFO: agent1/73267 on headnode: message\n',
|
||||
'[2012-05-08T17:02:57.404Z] INFO: agent2/73267 on headnode: message\n',
|
||||
'[2012-05-08T17:08:01.105Z] INFO: agent2/76156 on headnode: message\n',
|
||||
].join(''));
|
||||
t.end();
|
||||
});
|
||||
});
|
||||
|
|
39
test/ringbuffer.test.js
Normal file
39
test/ringbuffer.test.js
Normal file
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Test the RingBuffer output stream.
|
||||
*/
|
||||
|
||||
var test = require('tap').test;
|
||||
var Logger = require('../lib/bunyan');
|
||||
var ringbuffer = new Logger.RingBuffer({ 'limit': 5 });
|
||||
|
||||
var log1 = new Logger({
|
||||
name: 'log1',
|
||||
streams: [
|
||||
{
|
||||
stream: ringbuffer,
|
||||
raw: true,
|
||||
level: 'info'
|
||||
}
|
||||
]
|
||||
});
|
||||
|
||||
test('ringbuffer', function (t) {
|
||||
log1.info('hello');
|
||||
log1.trace('there');
|
||||
log1.error('android');
|
||||
t.equal(ringbuffer.entries.length, 2);
|
||||
t.equal(ringbuffer.entries[0]['msg'], 'hello');
|
||||
t.equal(ringbuffer.entries[1]['msg'], 'android');
|
||||
log1.error('one');
|
||||
log1.error('two');
|
||||
log1.error('three');
|
||||
t.equal(ringbuffer.entries.length, 5);
|
||||
log1.error('four');
|
||||
t.equal(ringbuffer.entries.length, 5);
|
||||
t.equal(ringbuffer.entries[0]['msg'], 'android');
|
||||
t.equal(ringbuffer.entries[1]['msg'], 'one');
|
||||
t.equal(ringbuffer.entries[2]['msg'], 'two');
|
||||
t.equal(ringbuffer.entries[3]['msg'], 'three');
|
||||
t.equal(ringbuffer.entries[4]['msg'], 'four');
|
||||
t.end();
|
||||
});
|
Loading…
Reference in a new issue