From 329d2b94db214177c3e5760b3623834f2ae3e866 Mon Sep 17 00:00:00 2001 From: dead_horse Date: Thu, 10 Apr 2014 00:02:13 +0800 Subject: [PATCH] make sure all intermediate stream bodies will be destroy and have error handle --- lib/application.js | 7 +------ lib/response.js | 9 +++++++++ package.json | 2 ++ test/application.js | 34 +++++++++++++++++++++++++++++++++- test/context.js | 5 +++-- 5 files changed, 48 insertions(+), 9 deletions(-) diff --git a/lib/application.js b/lib/application.js index fbf3e87..e600a9b 100644 --- a/lib/application.js +++ b/lib/application.js @@ -202,12 +202,7 @@ function *respond(next) { // Stream body if ('function' == typeof body.pipe) { - if (!~body.listeners('error').indexOf(this.onerror)) body.on('error', this.onerror); - - if (head) { - if (body.close) body.close(); - return res.end(); - } + if (head) return res.end(); return body.pipe(res); } diff --git a/lib/response.js b/lib/response.js index c3df5e9..dbf86f7 100644 --- a/lib/response.js +++ b/lib/response.js @@ -9,6 +9,9 @@ var status = require('statuses'); var http = require('http'); var path = require('path'); var mime = require('mime'); +var onFinished = require('finished'); +var destroy = require('dethroy'); +var inject = require('error-inject'); var basename = path.basename; var extname = path.extname; @@ -130,6 +133,12 @@ module.exports = { // stream if ('function' == typeof val.pipe) { + // make sure all intermediate streams will destroy on finished + onFinished(this, destroy.bind(null, val)); + + // make sure all intermediate streams has an listenner on error + inject(val, this.ctx.onerror); + if (setType) this.type = 'bin'; return; } diff --git a/package.json b/package.json index 103a79e..1a3818c 100644 --- a/package.json +++ b/package.json @@ -33,6 +33,8 @@ "koa-compose": "~2.2.0", "cookies": "~0.4.0", "delegates": "0.0.3", + "dethroy": "~1.0.0", + "error-inject": "~1.0.0", "only": "0.0.2" }, "devDependencies": { diff --git a/test/application.js b/test/application.js index 4b0890d..5389a92 100644 --- a/test/application.js +++ b/test/application.js @@ -261,7 +261,7 @@ describe('app.respond', function(){ res.status = 200; res.write('Hello'); setTimeout(function(){ - res.end("Goodbye") + res.end("Goodbye"); }, 0); }); @@ -432,6 +432,38 @@ describe('app.respond', function(){ .expect(404) .end(done); }) + + it('should handle errors when no content status', function(done){ + var app = koa(); + + app.use(function *(){ + this.status = 204; + this.body = fs.createReadStream('does not exist'); + }); + + var server = app.listen(); + + request(server) + .get('/') + .expect(204, done); + }) + + + it('should handle all intermediate stream body errors', function(done){ + var app = koa(); + + app.use(function *(){ + this.body = fs.createReadStream('does not exist'); + this.body = fs.createReadStream('does not exist'); + this.body = fs.createReadStream('does not exist'); + }); + + var server = app.listen(); + + request(server) + .get('/') + .expect(404, done); + }) }) describe('when .body is an Object', function(){ diff --git a/test/context.js b/test/context.js index 66164f4..bf2e6ce 100644 --- a/test/context.js +++ b/test/context.js @@ -1,11 +1,12 @@ +var ReadableStream = require('stream').Readable; var context = require('../lib/context'); var request = require('../lib/request'); var response = require('../lib/response'); var koa = require('..'); exports = module.exports = function(req, res){ - req = req || { headers: {} }; + req = req || { headers: {}, socket: new ReadableStream() }; res = res || { _headers: {} }; res.setHeader = function(k, v){ res._headers[k.toLowerCase()] = v }; res.removeHeader = function(k, v){ delete res._headers[k.toLowerCase()] }; @@ -22,4 +23,4 @@ exports.request = function(req, res){ exports.response = function(req, res){ return exports(req, res).response; -} \ No newline at end of file +}