Environment: NodeJS,Express,DynamoDB(但真的可以是任何数据库)
Scenario: 需要读取大量记录并作为可下载文件返回给用户 . 这意味着我不能一次缓冲所有内容,然后在Express的响应中发送它 . 此外,我可能需要多次执行查询,因为在一个查询中可能不会返回所有数据 .
Proposed Solution: 使用可以通过管道传输到Express中的响应流的可读流 .
我首先创建了一个继承自stream.Readable的对象,并实现了一个推送查询结果的_read()方法 . 问题是_read()中调用的数据库查询是异步的,但stream.read()是同步方法 .
当流通过管道传递给服务器的响应时,在db查询甚至有机会执行之前,会多次调用读取 . 因此,查询被多次调用,即使查询的第一个实例完成并执行push(null),其他查询也会完成,并且在EOF之后出现“push()”错误 .
-
有没有办法用_read()正确地做到这一点?
-
我应该忘记_read()并只在构造函数中执行查询和push()结果吗?
-
我应该执行查询并发出数据事件而不是push()吗?
谢谢
function DynamoDbResultStream(query, options){
if(!(this instanceof DynamoDbResultStream)){
return new DynamoDbResultStream(query, options);
}
Readable.call(this, options);
this.dbQuery = query;
this.done = false;
}
util.inherits(DynamoDbResultStream, Readable);
DynamoDbResultStream.prototype._read = function(){
var self = this;
if(!this.done){
dynamoDB.query(this.dbQuery, function(err, data) {
if (!err) {
try{
for(i=0;i<data.Items.length;i++){
self.push(data.Items[i]);
}
}catch(err){
console.log(err);
}
if (data.LastEvaluatedKey) {
//Next read() should invoke the query with a new start key
self.dbQuery.ExclusiveStartKey = data.LastEvaluatedKey;
}else{
self.done=true;
self.push(null);
}
}else{
console.log(err);
self.emit('error',err);
}
});
}else{
self.push(null);
}
};
EDIT: 发布此问题后,我发现这篇文章的答案显示了如何在不使用继承的情况下执行此操作:How to call an asynchronous function inside a node.js readable stream
在那里发表评论说,在_read()中应该只有一个push() . 每个push()通常会生成另一个read()调用 .
1 回答
请注意Stream的不同模式:https://nodejs.org/api/stream.html#stream_two_modes