asynquence "reactive sequences" for handling http request/response streams
June 7, 2014 ยท View on GitHub
// Inspired by/adapted from: https://gist.github.com/totherik/4fb1784f008815ac82e1
var http = require("http"); var ASQ = require("asynquence-contrib"); // bring in ASQ + optional contrib plugins
var server, source;
server = http.createServer(); server.setTimeout(30000); server.listen(8000);
// setup reactive listeners source = ASQ.react(function setup(next,registerTeardown){ server.addListener("request",next); server.addListener("close",this.stop);
registerTeardown(function(){
server.removeListener("request",next);
server.removeListener("close",source.stop);
});
});
// subscription source .then(function onNext(done,req,res){ req.setEncoding("utf8"); res.setHeader("Content-Type","text/html");
var body = "", collector;
// setup reactive listeners
collector = ASQ.react(function setup(next,registerTeardown){
next.onStream(req); // listen for standard stream events
req.on("end",onEnd);
req.resume(); // is this needed?
registerTeardown(function(){
next.unStream(req); // undo standard stream events
req.removeListener("end",onEnd);
});
});
// subscription
collector
.val(onNext)
.or(onError);
function onNext(x){
if (x instanceof Error) throw x;
body += x;
}
function onEnd() {
res.statusCode = 200;
res.end(" .. ");
collector.stop();
done();
}
function onError(e) {
res.statusCode = 500;
res.end();
collector.stop();
done.fail(e);
}
}) .then(function onComplete(){ console.log("complete"); }) .or(function onError(e){ console.error(e); });
process.on("SIGINT",source.stop);