首页 文章

如何在NodeJs中使用异步数据源创建可读流?

提问于
浏览
17

Environment: NodeJS,Express,DynamoDB(但真的可以是任何数据库)

Scenario: 需要读取大量记录并作为可下载文件返回给用户 . 这意味着我不能一次缓冲所有内容,然后在Express的响应中发送它 . 此外,我可能需要多次执行查询,因为在一个查询中可能不会返回所有数据 .

Proposed Solution: 使用可以通过管道传输到Express中的响应流的可读流 .

我首先创建了一个继承自stream.Readable的对象,并实现了一个推送查询结果的_read()方法 . 问题是_read()中调用的数据库查询是异步的,但stream.read()是同步方法 .

当流通过管道传递给服务器的响应时,在db查询甚至有机会执行之前,会多次调用读取 . 因此,查询被多次调用,即使查询的第一个实例完成并执行push(null),其他查询也会完成,并且在EOF之后出现“push()”错误 .

  • 有没有办法用_read()正确地做到这一点?

  • 我应该忘记_read()并只在构造函数中执行查询和push()结果吗?

  • 我应该执行查询并发出数据事件而不是push()吗?

谢谢

function DynamoDbResultStream(query, options){
    if(!(this instanceof DynamoDbResultStream)){
        return new DynamoDbResultStream(query, options);
    }

    Readable.call(this, options);

    this.dbQuery = query;
    this.done = false;
}
util.inherits(DynamoDbResultStream, Readable);

DynamoDbResultStream.prototype._read = function(){
    var self = this;
    if(!this.done){
        dynamoDB.query(this.dbQuery, function(err, data) {
            if (!err) {
                try{
                    for(i=0;i<data.Items.length;i++){
                        self.push(data.Items[i]);
                    }
                }catch(err){
                    console.log(err);
                }
                if (data.LastEvaluatedKey) {
                    //Next read() should invoke the query with a new start key
                    self.dbQuery.ExclusiveStartKey = data.LastEvaluatedKey;
                }else{
                    self.done=true;
                    self.push(null);
                }
            }else{
                 console.log(err);
                 self.emit('error',err);
            }
        });
    }else{
        self.push(null);
    }
};

EDIT: 发布此问题后,我发现这篇文章的答案显示了如何在不使用继承的情况下执行此操作:How to call an asynchronous function inside a node.js readable stream

在那里发表评论说,在_read()中应该只有一个push() . 每个push()通常会生成另一个read()调用 .

1 回答

  • 1

    请注意Stream的不同模式:https://nodejs.org/api/stream.html#stream_two_modes

    const Readable = require('stream').Readable;
    
    // starts in paused mode
    const readable = new Readable();
    
    let i = 0;
    fetchMyAsyncData() {
      setTimeout(() => {
        // still remains in paused mode
        readable.push(++i);
    
        if (i === 5) {
          return readable.emit('end');
        }
    
        fetchMyAsyncData();
      }, 500);    
    }
    
    // "The res object is an enhanced version of Node’s own response object and supports all built-in fields and methods."
    app.get('/mystreamingresponse', (req, res) => {
    
      // remains in paused mode
      readable.on('readable', () => res.write(readable.read()));
    
      fetchMyAsyncData();
    
      // closes the response stream once all external data arrived
      readable.on('end', () => res.end());
    })
    

相关问题