Merge pull request #255 from dead-horse/stream-body

make sure all intermediate stream bodies will be destroy
This commit is contained in:
TJ Holowaychuk 2014-04-14 15:34:13 -07:00
commit 95a78c3a44
5 changed files with 48 additions and 9 deletions

View file

@ -208,12 +208,7 @@ function *respond(next) {
// Stream body // Stream body
if ('function' == typeof body.pipe) { if ('function' == typeof body.pipe) {
if (!~body.listeners('error').indexOf(this.onerror)) body.on('error', this.onerror); if (head) return res.end();
if (head) {
if (body.close) body.close();
return res.end();
}
return body.pipe(res); return body.pipe(res);
} }

View file

@ -9,6 +9,9 @@ var status = require('statuses');
var http = require('http'); var http = require('http');
var path = require('path'); var path = require('path');
var mime = require('mime'); var mime = require('mime');
var onFinished = require('finished');
var destroy = require('dethroy');
var inject = require('error-inject');
var basename = path.basename; var basename = path.basename;
var extname = path.extname; var extname = path.extname;
@ -129,6 +132,12 @@ module.exports = {
// stream // stream
if ('function' == typeof val.pipe) { if ('function' == typeof val.pipe) {
// make sure all intermediate streams will destroy on finished
onFinished(this, destroy.bind(null, val));
// make sure all intermediate streams has an listenner on error
inject(val, this.ctx.onerror);
if (setType) this.type = 'bin'; if (setType) this.type = 'bin';
return; return;
} }

View file

@ -33,6 +33,8 @@
"koa-compose": "~2.2.0", "koa-compose": "~2.2.0",
"cookies": "~0.4.0", "cookies": "~0.4.0",
"delegates": "0.0.3", "delegates": "0.0.3",
"dethroy": "~1.0.0",
"error-inject": "~1.0.0",
"only": "0.0.2" "only": "0.0.2"
}, },
"devDependencies": { "devDependencies": {

View file

@ -261,7 +261,7 @@ describe('app.respond', function(){
res.status = 200; res.status = 200;
res.write('Hello'); res.write('Hello');
setTimeout(function(){ setTimeout(function(){
res.end("Goodbye") res.end("Goodbye");
}, 0); }, 0);
}); });
@ -522,6 +522,38 @@ describe('app.respond', function(){
.expect(404) .expect(404)
.end(done); .end(done);
}) })
it('should handle errors when no content status', function(done){
var app = koa();
app.use(function *(){
this.status = 204;
this.body = fs.createReadStream('does not exist');
});
var server = app.listen();
request(server)
.get('/')
.expect(204, done);
})
it('should handle all intermediate stream body errors', function(done){
var app = koa();
app.use(function *(){
this.body = fs.createReadStream('does not exist');
this.body = fs.createReadStream('does not exist');
this.body = fs.createReadStream('does not exist');
});
var server = app.listen();
request(server)
.get('/')
.expect(404, done);
})
}) })
describe('when .body is an Object', function(){ describe('when .body is an Object', function(){

View file

@ -1,11 +1,12 @@
var ReadableStream = require('stream').Readable;
var context = require('../lib/context'); var context = require('../lib/context');
var request = require('../lib/request'); var request = require('../lib/request');
var response = require('../lib/response'); var response = require('../lib/response');
var koa = require('..'); var koa = require('..');
exports = module.exports = function(req, res){ exports = module.exports = function(req, res){
req = req || { headers: {} }; req = req || { headers: {}, socket: new ReadableStream() };
res = res || { _headers: {} }; res = res || { _headers: {} };
res.setHeader = function(k, v){ res._headers[k.toLowerCase()] = v }; res.setHeader = function(k, v){ res._headers[k.toLowerCase()] = v };
res.removeHeader = function(k, v){ delete res._headers[k.toLowerCase()] }; res.removeHeader = function(k, v){ delete res._headers[k.toLowerCase()] };
@ -22,4 +23,4 @@ exports.request = function(req, res){
exports.response = function(req, res){ exports.response = function(req, res){
return exports(req, res).response; return exports(req, res).response;
} }